2

これまでは0.8 APIを使用していました。トピックリストを渡すと、ストリームのマップ(トピックごとに1つのエントリ)が返されます。これにより、別のスレッドを生成し、各トピックのストリームをそのスレッドに割り当てることができます。各トピックにあまりにも多くのデータを持つと、別のスレッドを生成することでマルチタスキングが助長されます。kafka new api 0.10はトピックごとにストリームとコンシューマオブジェクトのリストを提供していません

//0.8 code sample 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap); 

0.10にアップグレードします。私はKafkaStreamsKafkaConsumerのクラスをチェックしました。 KafkaConsumerオブジェクトはconfigプロパティを受け取り、topicリストを受け取るsubscribeメソッドを提供し、戻り値の型は無効です。私はそれぞれのトピックを扱う方法を見つけることができません。他方、同じ問題を抱えているように思われる。

KStreamBuilder builder = new KStreamBuilder(); 
String [] topics = new String[] {"topic1", "topic2"}; 
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics); 
KafkaStreams streams = new KafkaStreams(builder, props); 
streams.start(); 

利用可能な方法はsource.foreach()ですが、すべてのトピックのストリームです。誰でも、どんなアイデアですか?

答えて

2

まず、スレッドの消費者はトリッキーであるマルチを使用して、このようにあなたが0.8に採用パターンは、うまくいけばうまく設計されています:)

ベストプラクティスは、シングルスレッドの消費者を使用することであり、従って、「不要」にはありません単一の消費者が一度にトピックのリストを購読する場合は、異なるトピックを分離します。それにもかかわらず、レコードオブジェクトを消費している間、レコードオブジェクトは、それがどのトピックから由来するかについての情報を提供する(このメタデータを運ぶ)。したがって、理論的には、実際の処理のためにトピックごとのレコードを別のスレッドにディスパッチできます(これはお勧めしません)。

カフカはパーティションを経由してスケールアウト、シングルスレッドの消費者が負荷を処理できない場合ので、あなたは、消費者の処理能力をスケールアウトする(コンシューマ・グループなど)、複数の消費者を開始する必要があります。

より一般的な質問:トピックごとにデータを処理する場合は、それぞれに1つのトピックにそれぞれサブスクライブする複数のコンシューマを使用しないでください。

最後にカフカストリーム APIが新たに導入されたストリーム処理ライブラリであるApacheのカフカ0.10+ではなく、少なくとも - それは0.8 KafkaStreamクラス(ヒント、何の「s」はありません)と混同してはいけませんけれども。両方ともお互いに完全に無関係です。

+0

質問に答える、はい、トピックごとに複数のコンシューマを作成します。ストリームのマップを返す古いAPIからcreateMessageStreams(topicMapCount)を呼び出すのと比べて、新しいトピックのたびに接続を作成するのが正しいとは思わないことです。とにかく、詳細な答えをありがとう。 – colossal

関連する問題