2016-06-28 17 views
0

単一の要素の後にakkaストリームが停止しています。私のストリームは以下の通りです:1つの要素の後にAkkaストリームが停止します

val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
    FirehoseActor.props(
    auth = ... 
) 
) 

val ref = Flow[FirehoseActor.RawTweet] 
    .map(r => ResponseParser.parseTweet(r.payload)) 
    .map { t => println("Received: " + t); t } 
    .to(Sink.onComplete({ 
    case Success(_) => logger.info("Stream completed") 
    case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}") 
    })) 
    .runWith(firehoseSource) 

FirehoseActor TwitterのFirehoseに接続し、メッセージをキューにバッファします。アクターがRequestメッセージを受信すると、takeの次の要素とそれを返す:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    onNext(RawTweet(queue.take())) 
} 

問題は、単一のつぶやきがコンソールに出力されていることです。プログラムは終了したりエラーを投げたりすることはなく、私はロギングステートメントを振りかざしていて、何も印刷されていません。

シンクが要素を引き抜く圧力をかけ続けていると思ったが、Sink.onCompleteのメッセージのどちらも印刷されないので、そうではないようだ。私もSink.ignoreを使ってみましたが、それは単一の要素だけを印刷しました。アクター内のログメッセージは1度だけ印刷されます。

フローを通してエレメントを無限に引き出すためには、どのシンクを使用する必要がありますか?

答えて

0

ああ私は俳優の中でtotalDemandを尊敬していたはずです。これは、問題は修正されます。私は、ストリーム内の各要素に対してRequestを受け取るために期待していた

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    while (totalDemand > 0) { 
     onNext(RawTweet(queue.take())) 
    } 

を、どうやら各フローはRequestを送信します。

関連する問題