2016-10-06 5 views
1

私は今、私は、例えば、キーのサブセットをフィルタリングしたいアッカストリームフィルタ&グループは、

case class Msg(keys: Seq[Char], value: String) 

の流れを持っています val filterKeys = Set[Char]('k','f','c')Filter(k.exists(filterKeys.contains))) これらを分割して、特定のキーが異なるフローによって処理され、最後に一緒にマージされるようにします。

        /-key=k-> f1 --\ 
Source[Msg] ~> Filter ~> router |--key=f-> f2 ----> Merge --> f4 
           \-key=c-> f3 --/ 

どうすればよいですか?古い方法で

FlexiRouteは行くには良い方法のように思えたが、新しいAPIに私はカスタムGraphStageを作るか、または私は介してこれを行う方法がわからなかったとしてDSLから自分のグラフを作成するかしたい推測しています組み込みのステージ..?

答えて

4

小型キーセットソリューション

あなたのキーセットは小さく、かつ不変である場合には、放送やフィルタの組み合わせは、おそらく理解するための最も簡単な実装になります。 in the documentationを説明するように

def goodKeys(keySet : Set[Char]) = Flow[Msg] filter (_.keys exists keySet.contains) 

これはその後、放送局を養うことができます:あなたは、まずあなたが説明したフィルタを定義する必要があります。そして、あなたのキーセットが大きい場合

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 

    val source : Source[Msg] = ??? 

    val goodKeyFilter = goodKeys(Set('k','f','c')) 

    val bcast = builder.add(BroadCast[Msg](3)) 
    val merge = builder.add(Merge[Msg](3)) 

    val kKey = goodKeys(Set('k')) 
    val fKey = goodKeys(Set('f')) 
    val cKey = goodKeys(Set('c')) 

    //as described in the question 
    val f1 : Flow[Msg, Msg, _] = ??? 
    val f2 : Flow[Msg, Msg, _] = ??? 
    val f3 : Flow[Msg, Msg, _] = ??? 

    val f4 : Sink[Msg,_] = ??? 

    source ~> goodKeyFilter ~> bcast ~> kKey ~> f1 ~> merge ~> f4 
          bcast ~> fKey ~> f2 ~> merge 
          bcast ~> cKey ~> f3 ~> merge 

大型キーセットソリューション

:良いキーを持つすべてのMsg値は、3つのフィルタのそれぞれにブロードキャストされ、各フィルタは、特定のキーを許可しますgroupByが良いです。あなたは関数への鍵のMapがあるとします。

//e.g. 'k' -> f1 
val keyFuncs : Map[Set[Char], (Msg) => Msg] 

このマップは、GROUPBY機能を使用することができます。

source 
    .via(goodKeys(Set('k','f','c')) 
    .groupBy(keyFuncs.size, _.keys) 
    .map(keyFuncs(_.keys)) //apply one of f1,f2,f3 to the Msg 
    .mergeSubstreams