異なるキーのメッセージを別のパーティションに保存しようとしています。例えばKafka - 同じパーティションに異なるキーが格納されているメッセージ
:
ProducerRecord<String, String> rec1 = new ProducerRecord<String, String>("topic", "key1", line);
ProducerRecord<String, String> rec2 = new ProducerRecord<String, String>("topic", "key2", line);
producer.send(rec1);
producer.send(rec2);
しかし、私は私のプロデューサークラスを実行しようとすると、それは常に単一のパーティションに保存されています。
ドキュメントごとに、DefaultPartitioner
はmessage key hash code
を使用してパーティションを検索します。 私はこの質問Kafka partition key not working properlyも見ましたが、0.930バージョンのKafkaクライアントライブラリでByteArrayPartitioner
クラスが見つかりません。
props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
アップデート:私は、コードを使用してオンザフライでトピックを作成してい。
パーティションを手動で作成すると、正常に動作します。
お返事ありがとうございます。 – Shankar