私は簡単にakka-streamの流れを再利用する方法を探しています。akka-stream flowを再利用するエレガントな方法
は私が関数として再利用する予定のフローを扱うので、私は次のようにその署名を維持したいと思います:私はこのフローを使用するときに今、私のことができるようにしたいと思い
Flow[Input, Output, NotUsed]
「コール'この流れを続け、さらなる処理のために結果を保管しておきます。
フローを開始して[Input]
を発行し、私のフローを適用し、フロー放出[(Input, Output)]
に進みたいとします。
例:.via()
と流れを組み合わせることがちょうど[Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
を発する私の流れを与えるため
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
は今、これは代替が私の再利用可能なフローが[(Input, Output)]
を発するようにすることです簡単な方法では不可能ですしかし、これはすべての流れがすべての段階を通して入力を押して、自分のコードを悪く見せる必要があります。
だから私はこのようなコンバイナを思い付いた:直接フローへの入力を放送し、同様に平行線にある
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
}
- >「の両方ジップ」にステージでは、値をタプルに結合します。
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
すべて素晴らしいが、所定のフローが「フィルタ」操作を行っているとき - このコンバイナが滞っていると、さらにイベントの処理を停止します。次に、それをエレガントに適用することができます。
私は、すべてのサブフローで同じことを行う「Zip」動作が原因だと思います。私の場合、あるブランチが特定のオブジェクトを直接渡しているので、別のサブフローはこの要素を無視できません。 filter()、それ以降はZipがプッシュを待っているのでフローが停止します。
流れの構成を達成するためのより良い方法はありますか? 'flow'が 'filter'の要素を無視するときに、私がtupledFlowで何かできることはありますか?
からのすべてのテストに合格します。各入力要素に対して、0,1、またはそれ以上の出力要素を返すことがあります。さらに多くのデータが利用可能になったときに入力要素を戻して、後で使用することさえできます。このため、ラップされたフローがそれ自体をサポートしていない場合、この機能を一般的に提供することは不可能です。ラップされた 'Flow'が実際に関数のように動作する1対1のフローであることが厳密に強制されていれば(フィルタは動作しません)、一般的に動作します。通常、このような場合に 'mapAsync'を使用する方が簡単です。 – jrudolph
はい、そうです。私の再利用可能なフローがN個の要素を返す場合、問題が発生します。ラップされた 'Flow'が1つの入力要素ごとに0または1要素を出力するという仮定を述べれば、' Flow'出力をラップした場合にのみ入力でジッパーする異なるセマンティック 'Zip'演算子を書くことができます。 「流れ」はどんな要素も押していない。 –
ラップされたフローの引き込みと押し込みが同期して行われないため、これを行うことは困難です。ラップされた「フローが要素をプッシュしていない」かどうかを確認することはできません。遅いかバッファされているかどうかは確認できません。 – jrudolph