2017-11-29 6 views
0

私が試した:EventStream - >ソース - >アッカHTTP(SSE)AkkaのEventStreamからサーバー側のイベントを作成するにはどうすればよいですか?

を、私はそれを見るように、これは動作しないことができ、ソースはアッカHTTP complete(Source, ...) によってマテリアライズされ、マテリアライズドへEventStreamからメッセージを送信するので、ソース私は(ActorRefことを取得する方法はありますか?)ActorRefが必要


は、私が使用していますActorPublisher GitHubの上の解決策発見しました:ActorPublisherは内部APIであるため、 https://github.com/calvinlfer/Akka-HTTP-Akka-Streams-Akka-Actors-Integration

をしかし、 、私はまだ清潔な解決策を望んでいます。

+0

「EventStream」からのイベントストリームが終了したときに、開発者はどのように知っていますか? –

答えて

1

次のような方法で、ServerSentEventインスタンス、およびBroadcastHub.sinkにイベントストリーム要素を変換Sourceを作成するためにSource.actorRefを使用することができます:下流の需要がある場合は、

val (sseActor, sseSource) = 
    Source.actorRef[EventStreamMessageOrWhatever](10, akka.stream.OverflowStrategy.dropTail) 
    .map(s => /* convert event stream elements to ServerSideEvent */) 
    .keepAlive(1.second,() => ServerSentEvent.heartbeat) 
    .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both) 
    .run() 

をメッセージ(すなわち、イベント・ストリーム要素)がダウンストリームに出力されます。下流の需要がない場合、メッセージは指定されたオーバーフローストラテジで特定の数(この例ではバッファサイズは10)までバッファリングされます。

その後、EventStreamにマテリアライズド俳優を購読することができます。

eventStream.subscribe(sseActor, ...) 

また、マテリアライズドSourceは、あなたのパスに使用するために提供されています:

path("sse") { 
    get { 
    complete(sseSource) 
    } 
} 

(注)このアプローチとは背圧がないこと。

関連する問題