2016-12-29 3 views
0

は誰でも以下queries.Iから私を助けることができるがカフカ-クライアント-0.10.1.1(単一ノードシングルブローカー)auto.create.topics.enableのカフカクライアントAPI質問

デフォルト値を使用していますがtrueです。

1.Iは消費する

kafkaProdcuer<String,String> producer> producer... 
    producer.send(new ProducerRecord<String, String>("my- topic","message")); 
    producer.close(); 

を使用してトピックにメッセージを送信しています:

kafkaConsumer<String,String> consumer.... 
    consumer.subscribe(Arrays.asList("my-topic")); 
    ConsumerRecords<String, String> records = consumer.poll(200); 

    while(true){ 
    for (ConsumerRecord<String, String> record : records) { 
      System.out.println(record.value()); 
     } 
    } 

問題は、私が最初に消費者を実行するとき、それは値を取得していないです。そして私はプロデューサーを実行し、消費者を再び動かして値を得る必要があります。いくつかの時間私はプロデューサーを3回実行する必要があります。 これはなぜこのように機能しますか? enable.auto.commitプロパティがfalseの場合

2)enable.auto.commit = falseを

は、同じ消費者がメッセージを複数回を読むことはできますか?

3)第一point.Howで私の消費者のコードを考えると、私は

+0

kafkaビンにはコンソールコンシューマーがあります。消費者がデータを消費できない間に試してみることができます。可能であれば、producer.flush()を追加してみてください。 3番目の質問については、ストリーミングプログラムがバッチの終わりを知る方法はありませんが、タイムアウトスレッドを設定して、データが消費されずにタイムアウトを監視することができます。 – Lhfcws

+0

はいbinコンシューマでテストしましたが、相関ID 1のメタデータを取得するときにエラーが発生しました:{my-topic-106 = LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient) – jena84

+0

データを作成しましたか最近あなたの消費データの前に?デフォルトでは、カフカはあなたのデータを3日間保管しておくだけです。 – Lhfcws

答えて

1

1)私は、消費者はそれがすべてのメッセージを読み、次にconsumer.close(呼び出している知ることができますどのように意味のループを破ることができます)あなたはいつも使用しています消費者の同じgroup.id?あなたは消費する前に生産していますか?これは、消費者グループおよびオフセット管理に関連する可能性があります。 this answer about consumer offset behaviorをご覧ください。

2)意図的にまたは偶然に重複を読み取ることを意味するのかどうかはわかりません。トピック保持ポリシーのために削除されていないメッセージであれば、常に同じ位置に移動して同じメッセージを再度読むことができます。あなたが偶然であれば、自動コミットがfalseに設定されているということは、コンシューマがあなたにオフセットをコミットしないことを意味するだけで、commitSync()またはcommitAsync()を手動でコールする必要があります。いずれにしても、コンシューマーがメッセージを処理し、コミットする前にクラッシュする可能性があります。その場合、コンシューマーがリカバリーされると、コミットされたメッセージは再びコミットされます。あなたが一度だけセマンティックをしたいのであれば、処理されたメッセージでアトミックにオフセットを格納するなど、何か他のことをしなければなりません。

3)前述のように、ストリームには「すべてのメッセージ」のような概念はありません。あなたが行うことができますいくつかのもの(トリック)は、次のとおりです。

  • 空と、時代のいくつかの構成された数がループして終了を破る後の場合は、レコードのリストは、世論調査で返されるかどうかをチェックすることができます。
  • メッセージが注文された場合(1つのパーティションから読み取っている場合)、特殊なEND_OF_DATAメッセージを送信できます。表示すると、消費者を閉じます。
  • 消費者にいくつかのメッセージを読み込ませてから終了させ、次に最後にコミットされたオフセットから続行することができます。
+0

ありがとうございましたLhfcwsとLuciano。私は今、2番目と3番目の点についてはっきりしています.1番目の点については、私はプロデューサーの直後に消費者を実行しています。私は消費者グループを変更していません。 producer.sendというコードがそのトピックを作成すると仮定します。 bootstrap.servers = localhost:9092 group.id = test enable.auto.commit = true – jena84

+1

jena84、コンシューマ設定でauto.offset.resetを "最も早く"設定してもう一度試してみてください。また、消費者がリバランスを完了するのを待ってから開始します。 –

+0

恐ろしい!!!それは働きました。とても大変ありがとうございます。あなたがオフセット管理について与えたリンクの理由のためです。私は「最小」を入れてみました。それは私を許さなかったのです。新しいコンシューマAPIのためですか? – jena84