1
私はカフカを使ってプロデュースを送り、メッセージを消費しています。Kafka ByteArray
プロデューサーは<String, ByteArray>
プロデューサーで作業しています。
私は以下のコードを使用していますが(例ではありますが)、各レコードはちょうど8バイト(コードの下のサンプル出力)になっています。
消費者が単純にバイト配列としてメッセージ全体を取る方法はありますか?
コード:のSystem.outの
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Arrays.asList(topic));
int i = 0;
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
出力:
offset = 1773133, key = 105906453, value = [[email protected]
offset = 1773134, key = 105906453, value = [[email protected]
offset = 1773135, key = 105906453, value = [[email protected]
offset = 1773136, key = 105906453, value = [[email protected]
offset = 1773137, key = 105906177, value = [[email protected]
offset = 1773138, key = 105906177, value = [[email protected]
offset = 1773139, key = 105906177, value = [[email protected]
offset = 1773140, key = 105906177, value = [[email protected]
offset = 1773141, key = 105906177, value = [[email protected]
私は信じているように、メッセージを再作成するために、これらのレコードの負荷を組み立てるためになることを楽しみにしていませんよ私は何かが欠けているし、マニュアルの組み立てがエラーを起こしやすいかもしれません。
Damned!私は今まで私の頭をKafkaの上に持っていましたが、私はこれを逃しました - しかし、私はまだプロデューサーが作り出しているそれぞれのメッセージの名前が何であれ、複数のチャンク/オフセットを取得しています。 –
私は単純に、入ってくるメッセージごとに単一のバイト配列を取得する何らかのリスナ/ループを持つことはできませんか? –
自分で行うのは簡単なことです。地図の各要素に対してメソッドを呼び出すだけです。 KafkaConsumerは依然としてかなり低いレベルのAPIなので、同じバッチでコールバックされるので、実際のプロトコルにデータが表示されます。メッセージごとにシングルコールバックに抽象化するものが必要な場合は、Samza(http://samza.apache.org/)またはKafkaStreams(http://www.confluent.io/blog/)のようなものを調べる必要があります。導入 - カフカ - ストリーム - ストリーム - 処理 - 単純/) –