2016-04-16 12 views
1

ここ数日間、Akka Streamsについて読んできました。私はここ数ヶ月間ScalaのRxライブラリで作業していました。私には、両方のライブラリが提供してくれるものがある程度重なっているようです。 RxScalaはちょっと使い始め、理解して使いやすくなっていました。たとえばScalaのRxライブラリを使用してカフカのトピックに接続し、Observableにラップしてサブスクライバにメッセージを受け取らせるという単純な使用例があります。Akka Streams入門

val consumerStream = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head 
val observableConsumer = Observable.fromIterator(consumerStream).map(_.message()) 

これは非常に簡単できれいです。どのように私はakkaストリームを開始する必要がありますかについての手がかりは?上記のソースと同じイベントを発生させたいところです。私は後でフローとシンクを持っていきます。最後に、私のメインクラスで、これら3つを組み合わせてアプリケーションデータフローを実行します。

提案がありますか?だからここ

答えて

2

は私が思い付いたものです:

val kafkaStreamItr = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head 
Source.fromIterator(() => kafkaStreamItr).map(_.message) 
関連する問題