2016-03-29 5 views
7

複数のメッセージを一斉に(new Producer APIを使用して)カフカクラスタに公開すると、メッセージごとにプロデューサからFutureが得られます。複数のメッセージをKafkaクラスタに確実に配信する

max.in.flight.requests.per.connection = 1retries > 0のように私のプロデューサーを設定したとしたら、私はちょうど最後の未来を待って、すべての前もまた(順番に)配信されていることを確認できますか?あるいは、私はすべての先物を待つ必要がありますか?コードで 、私はこの行うことができます。これに代えて

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
Future<?> f = null; 
for(MessageType message : messages){ 
    f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()); 
} 
try { 
    f.get(); 
} catch(ExecutionException e) { 
    //handle exception 
} 

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
List<Future<?>> futureList = new ArrayList<>(); 
for(MessageType message : messages){ 
    futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue())); 
} 
try { 
    for(Future<?> f : futureList) { 
    f.get(); 
    } 
} catch(ExecutionException e) { 
    //handle exception 
} 

と何も(最初のスニペットから)ここでキャッチされていない場合ことが保証さ:

try { 
    f.get(); 
} catch(ExecutionException e) { 

次に私のすべてのメッセージは、(プロデューサーがフードの下で再試行を行ったかどうかに関係なく)何か問題が生じた場合は、最初に問題が発生した最後の未来(私が待っている)でなくても例外が発生しますか?

奇妙なコーナーケースがありますか?

+0

最後の未来を調べるだけでは、以前のリクエストについては何も教えてくれません。また、再試行を有効にすると、障害発生時にメッセージの順序が変更される可能性があります。 – Sebastian

+0

もし私がmax.in.flight.requests.perを持っていれば。connection = 1なら、メッセージの並べ替えを妨げるべきではないでしょうか?以前のバッチが失敗した場合は、少なくとも私の解釈に従って、次のバッチを続行する前に再試行する必要があります。 – Manjabes

+0

それは再試行しますが、それでも 'n '回失敗した場合(' n = numberOfRetries')、次のメッセージに進み、潜在的に正常に挿入されます。このような状況に対処するには、個々の未来の結果を確認する必要があります。 – Sebastian

答えて

1

a)再試行不可能な例外が発生した場合は、無限に(または事実上無限に)再試行を設定し、b)大量のデータを破棄することができます。

もう少し説明すると、カフカには2つのクラスの例外があります。再試行可能な例外は、再実行すると成功する可能性のある失敗です。たとえば、NotEnoughReplicasExceptionは、必要なレプリカ数よりも少ないため、要求が拒否されることを示しています。しかし、失敗したブローカがオンラインに戻った場合、十分なレプリカを持っていて、元の状態に戻って、再度送信するとそのリクエストは成功します。対照的に、SerializationExceptionはretriableではありません。なぜなら、あなたが再度シリアル化しようとすると結果が異なると信じる理由がないからです。

プロデューサの再試行は、リトリーラブルではない例外が発生した時点までのみ適用されます。したがって、これらのどれにも当てはまらず、無限の再試行を使用し、前述の他の設定を使用すれば、最終的な未来が解決されれば、発注と配送が保証されます。ただし、リトリーラブルではない例外が発生する可能性があるため、それぞれの未来(またはコールバック)を処理し、要求が失敗した場合に少なくとも何かをログに記録することを確実にする方がはるかに優れています。

1

さらに、Ewen氏によると、ループ内のすべてのメッセージの送信が完了した後にflush()に電話をかけることもできます。このコールは、すべての先物が完了するまでブロックされます。この後、未来をチェックすることができます。あなたはこれを行うことができるようにするには、すべての未来を握る必要があります。

代わりに、あなたのセンドでコールバックを使用し、返された例外を格納することもできます(下に示すように)。フラッシュの使用は、例外をチェックする前に、すべての送信が完了したことを再度確認します。

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
final ArrayList<Exception> exceptionList = new ArrayList<>(); 

for(MessageType message : messages){ 
    producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     if (exception != null) { 
     exceptionList.add(exception); 
     } 
    } 
    }); 
} 

producer.flush(); 

if (!exceptionList.isEmpty()) { 
    // do stuff 
} 
関連する問題