それはAPIドキュメントに言われたよう:
これは、同期コミットされ、コミットが成功するか、回復不能なエラーがあるのいずれかになるまでブロックされます遭遇しました(この場合、それは発信者に投げられます)。
つまり、commitSync
はブロック方法です。それを呼び出すと、スレッドが成功するか失敗するまでブロックされます。例えば
、forループの各反復のため
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
、正常に返さまたはスローされた例外で中断のみconsumer.commitSync()
た後、あなたのコードは次の反復に移動します。
これは非同期呼び出しで、ブロックされません。発生したエラーは、コールバックに渡されるか(提供されている場合)、破棄されます。意味
、commitAsync
は、非ブロッキング方式です。それを呼び出してもスレッドはブロックされません。代わりに、最終的に成功するか失敗するかにかかわらず、次の手順の処理を続行します。前の例に似例えば
、が、ここではcommitAsync
を使用します。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
各反復についてforループでは、関係なく、最終的にconsumer.commitAsync()
に何が起こるか、あなたのコードは次へと移動することはありません繰り返し。コミットの結果は、定義したコールバック関数によって処理されます。
トレードオフ:データの一貫対レイテンシー
- あなたは、データの整合性を確保する必要がある場合は、必ず任意のさらなるアクションを行う前に、あなたは意志、ということになりますので、
commitSync()
を選択オフセットコミットが成功したか失敗したかを知る。しかし、同期とブロックのため、コミットが完了するのを待つ時間が長くなり、待ち時間が長くなります。
- 特定のデータの不一致があり、待ち時間が短くなるようにしたい場合は、完了するのを待たずに
commitAsync()
を選択してください。代わりに、コミット要求を送信し、後でKafka(成功または失敗)からの応答を処理します。一方、コードは実行を継続します。
これはすべて一般的に言えば、実際の動作は実際のコードとメソッドを呼び出す場所によって異なります。