2017-02-01 10 views
1

クラッシュする前に最後に処理したメッセージからプロデューサを開始する必要があります。幸いにも私は1つのトピックと1つのパーティションと1つのコンシューマを持つ場合です。Golang Kafkaの特定のオフセットからコンシューマを設定する方法10

私はhttps://github.com/Shopify/saramaを試しましたが、それはまだ利用可能ではないようです。 私は現在、すべてのメッセージオフセットをコミットできるように、https://godoc.org/github.com/bsm/sarama-clusterを使用しています。

最後にコミットされたオフセットを取得できません サラマフを上記のオフセットから開始する方法を見つけることができません。これまでに見つかった唯一のパラメータはConfig.Producer.Offsets.Initialです。

  1. 最後にコミットされたオフセットを取得する方法はありますか?
  2. オフセットがコミットされた最後のメッセージから消費者を開始させるにはどうすればよいですか? OffsetNewestは、消費者が最後に処理したメッセージではなく、最後に生成されたメッセージから開始します。
  3. bsm/sarama-clusterではなくShopify/saramaのみを使用することは可能ですか?

感謝事前に

P.S.私はカフカ10.0を使用しています。オフセットはカフカの店であり、飼い猫の店ではありません。

EDIT1: 部分的な解決策:sarama.OffsetOldestので、すべてのメッセージを取得し、我々は未処理済み1を見つけるまでそれらのすべてをスキップします。

答えて

0

オフセットがすでにパーティションに保存されている場合、sarama-clusterはそのオフセットからの消費を再開します。 Config.Producer.Offsets.Initialオプションは、保存されたオフセットが存在しない場合(消費者グループに対して最初に実行される)にのみ使用されます。

あなたのmain()関数の先頭に以下の行を追加することでこれを確認することができます

cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12

sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags) 

その後、出力に次のようなものが表示されますその12インチのオフセットは、サラマがそのパーティション(サンプル/ 0)から開始する予定です。

関連する問題