2016-12-28 2 views
6

私は簡単にakka-streamの流れを再利用する方法を探しています。akka-stream flowを再利用するエレガントな方法

は私が関数として再利用する予定のフローを扱うので、私は次のようにその署名を維持したいと思います:私はこのフローを使用するときに今、私のことができるようにしたいと思い

Flow[Input, Output, NotUsed]

「コール'この流れを続け、さらなる処理のために結果を保管しておきます。

フローを開始して[Input]を発行し、私のフローを適用し、フロー放出[(Input, Output)]に進みたいとします。

例:.via()と流れを組み合わせることがちょうど[Output]

val via: Source[String, NotUsed] = s.via(stringIfEven) 

を発する私の流れを与えるため

val s: Source[Int, NotUsed] = Source(1 to 10) 

val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) 

val via: Source[(Int, String), NotUsed] = ??? 

は今、これは代替が私の再利用可能なフローが[(Input, Output)]を発するようにすることです簡単な方法では不可能ですしかし、これはすべての流れがすべての段階を通して入力を押して、自分のコードを悪く見せる必要があります。

だから私はこのようなコンバイナを思い付いた:直接フローへの入力を放送し、同様に平行線にある

def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = { 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[In](2)) 
    val zip = b.add(Zip[In, Out]) 

    broadcast.out(0) ~> zip.in0 
    broadcast.out(1) ~> flow ~> zip.in1 

    FlowShape(broadcast.in, zip.out) 
}) 

}

- >「の両方ジップ」にステージでは、値をタプルに結合します。

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven)) 

すべて素晴らしいが、所定のフローが「フィルタ」操作を行っているとき - このコンバイナが滞っていると、さらにイベントの処理を停止します。次に、それをエレガントに適用することができます。

私は、すべてのサブフローで同じことを行う「Zip」動作が原因だと思います。私の場合、あるブランチが特定のオブジェクトを直接渡しているので、別のサブフローはこの要素を無視できません。 filter()、それ以降はZipがプッシュを待っているのでフローが停止します。

流れの構成を達成するためのより良い方法はありますか? 'flow'が 'filter'の要素を無視するときに、私がtupledFlowで何かできることはありますか?

+0

からのすべてのテストに合格します。各入力要素に対して、0,1、またはそれ以上の出力要素を返すことがあります。さらに多くのデータが利用可能になったときに入力要素を戻して、後で使用することさえできます。このため、ラップされたフローがそれ自体をサポートしていない場合、この機能を一般的に提供することは不可能です。ラップされた 'Flow'が実際に関数のように動作する1対1のフローであることが厳密に強制されていれば(フィルタは動作しません)、一般的に動作します。通常、このような場合に 'mapAsync'を使用する方が簡単です。 – jrudolph

+0

はい、そうです。私の再利用可能なフローがN個の要素を返す場合、問題が発生します。ラップされた 'Flow'が1つの入力要素ごとに0または1要素を出力するという仮定を述べれば、' Flow'出力をラップした場合にのみ入力でジッパーする異なるセマンティック 'Zip'演算子を書くことができます。 「流れ」はどんな要素も押していない。 –

+0

ラップされたフローの引き込みと押し込みが同期して行われないため、これを行うことは困難です。ラップされた「フローが要素をプッシュしていない」かどうかを確認することはできません。遅いかバッファされているかどうかは確認できません。 – jrudolph

答えて

3

つの可能なアプローチ - 議論の余地エレガンスとは - です:

1)Flow[Int, Option[Int], NotUsed]にあなたのフィルタを変異、フィルタリングステージを使用しないでください。この方法で元の計画と同様に、グラフ全体にジッパーラッパーを適用することができます。しかし、コードはより汚染されているように見え、Noneを回すことによってオーバーヘッドが追加されます。

val stringIfEvenOrNone = Flow[Int].map{ 
    case x if x % 2 == 0 => Some(x.toString) 
    case _ => None 
} 

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{ 
    case (num, Some(str)) => (num,str) 
} 

2)フィルター処理ステージとトランスフォームステージを分離し、ジッパーラッパーの前にフィルター処理を適用します。おそらくより軽量でより良い妥協点でしょう。

val filterEven = Flow[Int].filter(_ % 2 == 0) 

val toString = Flow[Int].map(_.toString) 

val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString)) 

EDIT

3)コメントでの議論に従って、明確にするため、ここで別の解決策を投稿。

このフローラッパーを使用すると、フローを生成した元の入力要素とペアになっている特定のフローから各要素を放出できます。これは、あらゆる種類の内部フロー(入力ごとに0,1またはそれ以上の要素を放出する)に対して機能します。

def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] = 
    Flow[In].flatMapConcat(in => Source.single(in).via(flow).map(out => in -> out)) 
+0

はい、オプション0)しかし、アクセス可能なapiを持っていても、 'TupledFlow'ラッパーで呼び出されたときは使用してはいけません。構成可能なコードから期待されるものではありません。 –

+0

究極の解決策は、私がhttp://stackoverflow.com/questions/41366030/elegant-way-of-reusing-akka-stream-flows#comment69952575_41366030 –

+0

に記載されているように振る舞う専用の演算子だと思います。このTupledFlowは間違いなく私が共有するものではありません - 例えば - スタンドアロンのライブラリ。しかし、それはあなたのプロジェクトで内部的に再利用可能なグラフ段階としていくらか意味があります。 –

1

私はラップFlowfilter()またはmapAsync()を使用し、包まれたときにFlowは、すべての入力に対して0,1またはNの要素を発したときに動作しますTupledFlowの実装を思い付いた:

def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = { 
    val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In => 
    val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq) 
    val bothFuture: Future[Seq[(In,Out)]] = outFuture.map(seqOfOut => seqOfOut.map((in,_))) 
    bothFuture 
    } 
    val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable]) 
    onlyDefined 
} 

唯一の欠点私はここでは、単一のエンティティのフローをインスタンス化して具体化しています。単に、「関数としてのフローを呼び出す」という概念を得ることです。

私はそれに関するパフォーマンステストをしませんでしたが、重い持ち上げは今後実行されるラップされたFlowで行われているので、これは問題ありません。

この実装は、ここでは概念の主な問題は、 `フロー[T、Uは、...]`関数ではないということですhttps://gist.github.com/kretes/8d5f2925de55b2a274148b69f79e55ac#file-tupledflowspec-scala

+2

これがあなたの後ろにあれば、おそらく逃げることができます ' def tupledFlow [In、Out] :フロー[In、Out、_]):フロー[In、(In、Out)、NotUsed] = { フロー[In] .flatMapConcat(in => Source.single(in)).via(flow).map out => in - > out)) } ' –

+0

はい、私はそれが私の後であると思います。それはあらゆる必要性を満たし、包まれた「流れ」のあらゆる行動を適切に処理します。 –

+0

あなたの実装は簡潔かつ最小限です。ありがとうございました。私はこれが私の質問に対する適切な答えだと思います –

関連する問題