1

与えられた:私はカフカで2つのトピックを話題AとトピックBとしましょう。カフカストリームはトピックAからレコードを読み取り、処理して複数のレコードを生成します(recordAとrecordBとしましょう)消費されたレコードに対応する。さて、カフカストリームを使ってこれをどうやって達成できるのかという疑問があります。カフカストリーム:複数のレコードに1つのレコード

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() { 
     @Override 
     public List<Message> apply(final Message message) { 
      return consumerRecordHandler.process(message); 
     } 
    }).*someFunction*() 

ここで、読み取られたレコードはメッセージです。処理後、メッセージのリストを返します。このリストを2つのプロデューサストリームにどのように分けることができますか?どんな助けもありがとう。

答えて

5

だので、私はないですあなたが質問を正しく理解しているか確かめてください。また、@Abhishekの回答も理解できません:(

入力ストリームがあり、入力レコードごとに0個、1個以上の出力レコードを取得する場合は、 flatMap()またはflatMapValues()を適用します(キーを変更するかどうかに応じて異なります)。

「このリストを2つのプロデューサストリームに分割するにはどうすればよいですか」という質問もあります。ストリームを複数に分割する場合は、branch()を使用できます。詳細については

、私はドキュメントを参照してください:http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations

+0

@ user2538255私の答えが不明な場合は、お気軽にフォローアップしてください。 –

+0

私がやっていることはまさにそれです。 Abhishekの答えを尋ねた後、私はこの例に着いた。https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/コンフルエント/ examples/streams/WordCountLambdaIntegrationTest.java – user2538255

+0

正解を受け入れました:)ありがとうございました:) – user2538255

2

あなたのキー(タイプ)は何ですか?私はそれがないと推測しているStringmapValuesを実行した後、これは-KStream<K,List<Message>>になります。 KStringない場合、someFunction()StringKを変換する(そのがある場合、あなたはすでに結果を持っている)とList<Message>を残してmapことができます(値)そのままそれがあなたの意図した最終結果

+0

うん、それはトンをworks..thanks! – user2538255

関連する問題