2015-11-20 7 views
9

私はakkaストリームを使用しています。フローが特定の値を処理できないため、条件付きでスキップする必要があるグラフのセグメントがあります。具体的には、文字列を受け取りHTTPリクエストを行うフローがありますが、文字列が空の場合、サーバーはそのケースを処理できません。しかし、代わりに空の文字列を返す必要があります。失敗することを知ってhttpリクエストを通過することなくこれを行う方法はありますか?私は基本的にこれを持っている:条件付きでakkaストリームを使用してフローをスキップします。

val source = Source("1", "2", "", "3", "4") 
val httpRequest: Flow[String, HttpRequest, _] 
val httpResponse: Flow[HttpResponse, String, _] 
val flow = source.via(httpRequest).via(httpResponse) 

私は私はHttpResponse流れに400エラーをキャッチされてやっとデフォルト値を返すと考えることができる唯一のこと。しかし、私は、私が知っている要求があらかじめ失敗することが予想されるため、サーバーに当たるオーバーヘッドを避けることができるようにしたいと考えています。

+0

あなたの例はコンパイルされません。 httpRequestの出力はHttpRequest型であり、httpResponseの入力はHttpResponse型であるため、 'via'と連鎖することはできません。 –

答えて

8

ヴィクトル・クランのソリューションは簡潔でelegantです。グラフを使って別の方法を実証したかっただけです。

ストリングのソースを2つのストリームに分割し、有効なストリングと無効なストリングの1つのストリームをフィルタリングできます。結果をマージします( "cross the streams")。

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] => 
    import FlowGraph.Implicits._ 

    val source = Source(List("1", "2", "", "3", "4")) 
    val sink : Sink[String,_] = ??? 

    val bcast = builder.add(Broadcast[String](2)) 
    val merge = builder.add(Merge[String](2)) 

    val validReq = Flow[String].filter(_.size > 0) 
    val invalidReq = Flow[String].filter(_.size == 0) 

    val httpRequest: Flow[String, HttpRequest, _] = ??? 
    val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ??? 
    val httpResponse: Flow[HttpResponse, String, _] = ??? 
    val someHttpTransformation = httpRequest via makeHttpCall via httpResponse 

    source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink 
      bcast ~>  invalidReq     ~> merge 
    ClosedShape 
}) 

注:documentationに基づい

この溶液は、ストリームを分割し、従ってシンク入力に基づいて予想されるよりも、異なる順序で文字列値の結果を処理することができます。

+2

はい!これは、操作の並べ替えで(http-flowとnon-httpが並行して動作するので)OKである場合、別のオプションです。 –

+0

@ViktorKlang Noted、ありがとうございます。 –

+0

これは完璧です。私は並列フィルタについては考えなかった。注文についての良い点はありますが、akka-httpのリクエストフローは、とにかく順不同です(少なくともhttpsプールは1つです) – Falmarri

12

あなたはflatMapConcatを使用することができます。

(警告:コンパイルされていませんでしたが、あなたはそれの要点を得るでしょう)

val source = Source("1", "2", "", "3", "4") 
val httpRequest: Flow[String, HttpRequest, _] 
val httpResponse: Flow[HttpResponse, String, _] 
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] 
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse 
val emptyStringSource = Source.single("") 
val cleanerSource = source.flatMapConcat({ 
    case "" => emptyStringSource 
    case other => Source.single(other) via someHttpTransformation 
}) 
関連する問題