2016-03-03 10 views
6

高水準のコンシューマAPIは、一度に1つのメッセージを読み取っているようです。カフカにはバッチ消費者がありますか?

SolrやElastic-Searchなどの他のダウンストリームコンシューマにメッセージを処理して提出する場合は、一度に1つではなく一括してメッセージを送信する方が望ましいため、消費者にとっては非常に問題になる可能性があります。

カフカのオフセットも、バッチがすでにコミットされている場合にのみ同期化する必要があるため、これらのメッセージをメモリにバッチするのは簡単ではありません。そうでない場合は、ダウンストリームメッセージがコミットされていないクラッシュしたカフカコンシューマ(SolrまたはES)そのオフセットが既に更新されており、ゆっくりとメッセージが更新されます。

メッセージをダウンストリームにコミットした後、メッセージのオフセットを更新する前にクラッシュした場合、コンシューマはメッセージを複数回消費することがあります。

カフカがメッセージを一括して消費する場合は、コード/ドキュメントへのいくつかのポインタが非常に高く評価されます。

ありがとうございます!

+0

あるカフカのどのバージョンあなたは頼んでいる?あなたがハイレベル消費者について話しているのなら、それは0.8.2かそれ以前です。 – morganw09dev

答えて

3

私はバッチ消費者に気づいていません。しかし、1つでも問題が残っていても、あなたの主な問題はそのままです。データを正常に転送した後にオフセットをコミットしたいとします。これを実現する1つの方法は、プロパティーauto.commit.enable = falseを設定して、コンシューマーの自動コミットをオフにすることです。勿論、オフセットをコミットするときには注意が必要です。

ここでは、消費者のプロパティの完全なドキュメントを探す:https://kafka.apache.org/documentation.html#consumerconfigs

オフセットのjava-docの(https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)から盗まをコミットてmanualyする方法についての良い例:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "false"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("foo", "bar")); 
final int minBatchSize = 200; 
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     buffer.add(record); 
    } 
    if (buffer.size() >= minBatchSize) { 
     insertIntoDb(buffer); 
     consumer.commitSync(); 
     buffer.clear(); 
    } 
} 
+0

私はあなたのオートコミットの説明に同意します。しかし、あなたのコードが行く限り、ConsumerRecordはKafka 0.9クラスですが、彼の質問は彼が0.9より前の消費者について質問しているように見えます。彼は明示的に述べていないが。 – morganw09dev

+0

上記のコードに問題があります。 – user2250246

+1

オフセットをコミットする前に消費者がクラッシュした場合、メッセージが再生されます。私は、beginTransaction()とendTransaction()のDBを持っていません。 – user2250246

関連する問題