2017-10-03 19 views
0

対commitSync欠点はcommitSyncは()それ はnonretriable失敗、commitAsyncを(成功するかのどちらかに遭遇するまでコミットを再試行する一方ということですhttps://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1カフカ - 消費者。 commitAsync

からの引用) は再試行しません。

このフレーズは私にとっては明らかではありません。私は、消費者がブローカにコミットリクエストを送信し、ブローカがタイムアウト内に応答しない場合は、アウミットが失敗したことを意味します。私が間違っている ?

具体的にはcommitSynccommitAsyncの違いを明確にすることはできますか?
また、コミットの種類を好む場合は、ユースケースを提供してください。

答えて

0

それはAPIドキュメントに言われたよう:


これは、同期コミットされ、コミットが成功するか、回復不能なエラーがあるのいずれかになるまでブロックされます遭遇しました(この場合、それは発信者に投げられます)。

つまり、commitSyncはブロック方法です。それを呼び出すと、スレッドが成功するか失敗するまでブロックされます。例えば

、forループの各反復のため

while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     consumer.commitSync(); 
    } 
} 

、正常に返さまたはスローされた例外で中断のみconsumer.commitSync()た後、あなたのコードは次の反復に移動します。


これは非同期呼び出しで、ブロックされません。発生したエラーは、コールバックに渡されるか(提供されている場合)、破棄されます。意味

commitAsyncは、非ブロッキング方式です。それを呼び出してもスレッドはブロックされません。代わりに、最終的に成功するか失敗するかにかかわらず、次の手順の処理を続行します。前の例に似例えば

、が、ここではcommitAsyncを使用します。

while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     consumer.commitAsync(callback); 
    } 
} 

各反復についてforループでは、関係なく、最終的にconsumer.commitAsync()に何が起こるか、あなたのコードは次へと移動することはありません繰り返し。コミットの結果は、定義したコールバック関数によって処理されます。


トレードオフ:データの一貫対レイテンシー

  • あなたは、データの整合性を確保する必要がある場合は、必ず任意のさらなるアクションを行う前に、あなたは意志、ということになりますので、commitSync()を選択オフセットコミットが成功したか失敗したかを知る。しかし、同期とブロックのため、コミットが完了するのを待つ時間が長くなり、待ち時間が長くなります。
  • 特定のデータの不一致があり、待ち時間が短くなるようにしたい場合は、完了するのを待たずにcommitAsync()を選択してください。代わりに、コミット要求を送信し、後でKafka(成功または失敗)からの応答を処理します。一方、コードは実行を継続します。

これはすべて一般的に言えば、実際の動作は実際のコードとメソッドを呼び出す場所によって異なります。

関連する問題