与えられた:私はカフカで2つのトピックを話題AとトピックBとしましょう。カフカストリームはトピックAからレコードを読み取り、処理して複数のレコードを生成します(recordAとrecordBとしましょう)消費されたレコードに対応する。さて、カフカストリームを使ってこれをどうやって達成できるのかという疑問があります。カフカストリーム:複数のレコードに1つのレコード
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
ここで、読み取られたレコードはメッセージです。処理後、メッセージのリストを返します。このリストを2つのプロデューサストリームにどのように分けることができますか?どんな助けもありがとう。
@ user2538255私の答えが不明な場合は、お気軽にフォローアップしてください。 –
私がやっていることはまさにそれです。 Abhishekの答えを尋ねた後、私はこの例に着いた。https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/コンフルエント/ examples/streams/WordCountLambdaIntegrationTest.java – user2538255
正解を受け入れました:)ありがとうございました:) – user2538255