2017-11-15 6 views
1

私はカフカのトピックを読んで、それに基づいていくつかの処理を行い、結果を別のトピックに保存しようとしています。Kafkaストリームを使用してSeqを抽出する

私のコードは以下のようになります。

builder 
    .stream(settings.Streams.inputTopic) 
    .mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e)) 
    // Something needs to be done here... 
    .to(settings.Streams.outputTopic) 

fx(e)機能は、いくつかの処理を行い、Seq[Product]を返します。すべての製品をトピックに別々のエントリとして保存したいと思います。問題は、トピックから読み取ったメッセージに複数の商品が含まれているため、戻り値がfx(e)であることです。

この動作をストリームに埋め込むことはできますか?

答えて

3

利用代わりmapValuesflatMapValues:トリックをした

import scala.collection.JavaConverters.asJavaIterableConverter 

builder 
    .stream(settings.Streams.inputTopic) 
    .flatMapValues(e => fx(e).toIterable.asJava) 
    .to(settings.Streams.outputTopic) 
+0

。ありがとう! –

関連する問題