カフカで同期メッセージを送信するには?
これを達成する1つの方法は、プロパティパラメータ
max.in.flight.requests.per.connection = 1
を設定することです。同期メッセージをkafkaで送信しますか?
しかし、私はカフカで同期メッセージを送信する直接的または代替的な方法があるかどうかを知りたいと思います。 (producer.syncSend(...)などのようなもの)。
カフカで同期メッセージを送信するには?
これを達成する1つの方法は、プロパティパラメータ
max.in.flight.requests.per.connection = 1
を設定することです。同期メッセージをkafkaで送信しますか?
しかし、私はカフカで同期メッセージを送信する直接的または代替的な方法があるかどうかを知りたいと思います。 (producer.syncSend(...)などのようなもの)。
プロデューサAPIは、をsend
から返します。送信が完了するまでブロックするにはFuture#get
に電話をかけることができます。
このexample from the Javadocs参照してください:あなたは、単純なブロッキングをシミュレートしたい場合は、すぐにget()メソッドを呼び出すことができます呼び出す
を:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
ティロ提案答えはへの道であります行く。一般的に、max.in.flight.requests.per.connection = 1を使用することについてのあなたの提案は、メッセージの順序を失うことなく、有効な再試行を有効にするために使用されます。これは、同期プロデューサを持つためにはあまり使用されません。
あなたはその理由を説明できますか?これがメッセージオーダー保証についてのものであれば、これはもう少し複雑になるかもしれません(同期的に送信しても実際には変更されません)。 – Thilo
同じプロデューサによって同じトピックの同じパーティションに送信されたメッセージは、その順序を保持します。複数のパーティション、トピック、またはプロデューサには順序保証はありません。メッセージが必要な場合は、アプリケーションのコードでコンシューマー側にメッセージを配置する必要があります(たとえば、メッセージのタイムスタンプを見て)。 – Thilo
@Thilo、これは別の質問につながります。パーティションごとまたはトピックごとに、またはプロデューサごとにバッチがありますか?またはこれらのいくつかの組み合わせ。 ? – joveny