これまでは0.8 APIを使用していました。トピックリストを渡すと、ストリームのマップ(トピックごとに1つのエントリ)が返されます。これにより、別のスレッドを生成し、各トピックのストリームをそのスレッドに割り当てることができます。各トピックにあまりにも多くのデータを持つと、別のスレッドを生成することでマルチタスキングが助長されます。kafka new api 0.10はトピックごとにストリームとコンシューマオブジェクトのリストを提供していません
//0.8 code sample
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
0.10にアップグレードします。私はKafkaStreams
とKafkaConsumer
のクラスをチェックしました。 KafkaConsumer
オブジェクトはconfigプロパティを受け取り、topicリストを受け取るsubscribeメソッドを提供し、戻り値の型は無効です。私はそれぞれのトピックを扱う方法を見つけることができません。他方、同じ問題を抱えているように思われる。
KStreamBuilder builder = new KStreamBuilder();
String [] topics = new String[] {"topic1", "topic2"};
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
利用可能な方法はsource.foreach()
ですが、すべてのトピックのストリームです。誰でも、どんなアイデアですか?
質問に答える、はい、トピックごとに複数のコンシューマを作成します。ストリームのマップを返す古いAPIからcreateMessageStreams(topicMapCount)を呼び出すのと比べて、新しいトピックのたびに接続を作成するのが正しいとは思わないことです。とにかく、詳細な答えをありがとう。 – colossal