複数のメッセージを一斉に(new Producer APIを使用して)カフカクラスタに公開すると、メッセージごとにプロデューサからFuture
が得られます。複数のメッセージをKafkaクラスタに確実に配信する
max.in.flight.requests.per.connection = 1
とretries > 0
のように私のプロデューサーを設定したとしたら、私はちょうど最後の未来を待って、すべての前もまた(順番に)配信されていることを確認できますか?あるいは、私はすべての先物を待つ必要がありますか?コードで 、私はこの行うことができます。これに代えて
Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
f.get();
} catch(ExecutionException e) {
//handle exception
}
:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
for(Future<?> f : futureList) {
f.get();
}
} catch(ExecutionException e) {
//handle exception
}
と何も(最初のスニペットから)ここでキャッチされていない場合ことが保証さ:
try {
f.get();
} catch(ExecutionException e) {
を
次に私のすべてのメッセージは、(プロデューサーがフードの下で再試行を行ったかどうかに関係なく)何か問題が生じた場合は、最初に問題が発生した最後の未来(私が待っている)でなくても例外が発生しますか?
奇妙なコーナーケースがありますか?
最後の未来を調べるだけでは、以前のリクエストについては何も教えてくれません。また、再試行を有効にすると、障害発生時にメッセージの順序が変更される可能性があります。 – Sebastian
もし私がmax.in.flight.requests.perを持っていれば。connection = 1なら、メッセージの並べ替えを妨げるべきではないでしょうか?以前のバッチが失敗した場合は、少なくとも私の解釈に従って、次のバッチを続行する前に再試行する必要があります。 – Manjabes
それは再試行しますが、それでも 'n '回失敗した場合(' n = numberOfRetries')、次のメッセージに進み、潜在的に正常に挿入されます。このような状況に対処するには、個々の未来の結果を確認する必要があります。 – Sebastian