単一の要素の後にakkaストリームが停止しています。私のストリームは以下の通りです:1つの要素の後にAkkaストリームが停止します
val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
FirehoseActor.props(
auth = ...
)
)
val ref = Flow[FirehoseActor.RawTweet]
.map(r => ResponseParser.parseTweet(r.payload))
.map { t => println("Received: " + t); t }
.to(Sink.onComplete({
case Success(_) => logger.info("Stream completed")
case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}")
}))
.runWith(firehoseSource)
FirehoseActor
TwitterのFirehoseに接続し、メッセージをキューにバッファします。アクターがRequest
メッセージを受信すると、take
の次の要素とそれを返す:
def receive = {
case Request(_) =>
logger.info("Received request for next firehose element")
onNext(RawTweet(queue.take()))
}
問題は、単一のつぶやきがコンソールに出力されていることです。プログラムは終了したりエラーを投げたりすることはなく、私はロギングステートメントを振りかざしていて、何も印刷されていません。
シンクが要素を引き抜く圧力をかけ続けていると思ったが、Sink.onComplete
のメッセージのどちらも印刷されないので、そうではないようだ。私もSink.ignore
を使ってみましたが、それは単一の要素だけを印刷しました。アクター内のログメッセージは1度だけ印刷されます。
フローを通してエレメントを無限に引き出すためには、どのシンクを使用する必要がありますか?