2017-10-26 12 views
0

私はAkka Streamsを使用して、同時にリクエストをサーバーに送信し、各リクエストをオリジナルのコンテキスト(この例ではInt)に関連付けることを試みています。これは私が特にakkaストリームの将来を変える

val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (HttpRequest, Int)]((mkRequest _).tupled) 
val sendRequestFlow: Flow[(HttpRequest, Int), (HttpResponse, Int), _] = Flow[(HttpRequest, Int)].mapAsyncUnordered(32)((sendRequest _).tupled) 
val handleResponseFlow: Flow[(HttpResponse, Int), String, _] = Flow[(HttpResponse, Int)].map[String]((getStatusString _).tupled) 

val handler = createRequestFlow via sendRequestFlow via handleResponseFlow 

toghether入れている流れですが、私はFuture[(HttpResponse, Int)]を返す方法を見つけようとしています。現在、私はこの

def sendRequest(request: HttpRequest, ctx: Int): Future[(HttpResponse, Int)] = { 
    Http().singleRequest(request).map(r => (r,ctx)) 
    } 

をやっているが、私は、これはエグゼキュータを必要とするという事実は、それを行うには、別の(より良い)方法があることを示していることを理解しています。

ありがとうございます。

+1

あなたは正しいパスにいます。単に 'sendRequest'を暗黙のExecutionContextにしてください。 –

答えて

1

私は良い方法はないと思います。 Akkaは標準のScala Futuresを使用し、ほとんどすべての操作を実行するには、設計上、ExecutionContextが必要です。この単純なmapのために別のスレッドを使用したくない場合は、Akkaが内部で使用するものと同じようなsameThreadExecutionContextを作成することができます(akka.dispatch.ExecutionContexts.sameThreadExecutionContext参照)。したがって、mapは、メインのHttp応答を処理するスレッドで実行されますより複雑なものには使用しないでください(GitHub #19043のディスカッションも参照してください)。