2016-04-27 16 views
0

提案が必要です。たとえば、私は10個のグラフを作成して並列に実行するサンプルコードを作成しました。Akka Stream - 並列実行可能なグラフ

このアプローチは正しいですか、グラフ内に複数のソースを作成して1つのグラフで並列に実行する必要がありますか?

def createGraph(start: Int, end: Int, name: String) = { 
    RunnableGraph.fromGraph(GraphDSL.create() { 
    implicit builder => 
     import GraphDSL.Implicits._ 
     val s = Source(start to end) 
     val f = Flow[Int].map[String](x => x.toString) 
     val sink = Sink.foreach[String](x => println(name + ":" + x)) 

     val t = builder.add(s) 

     val flow1 = builder.add(f) 

     t ~> flow1 ~> sink 

     ClosedShape 
    }) 
} 


(1 to 10).map(x => createGraph(x, x + 10, "g" + x)).map(_.run()) 

おかげ アルン

+0

:ストリームとして、いくつかの外部ソースから読み込むソース(最後に開始)の.map(_のtoString).runForeach(X => printlnを(S "$名: $ x ")) –

+0

私は複雑なフロー処理知的財産を維持するためのメインコードから。問題のステートメントは、複数のソース情報があり、各ソースごとにグラフを実行する必要がある場合、ベストプラクティスとなります。たとえば、複数のソースが複数のカフカトピックを読んで、変換し、処理し、データベースにシンクすると考えるかもしれません。 – ASe

+0

私はかなり奇妙に見えますが、なぜ私はそれをあまり指摘できません。私は自分のステージになるように時間を分け、マージやバランスを使ってそれらをまとめて一つのグラフにします。その後、そのグラフを1回だけ実行します。今あなたはn個の島を作成しています。 – akauppi

答えて

0

私は私のソースが異なっているが、流れhttp://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-parallelism.htmlを、これはよさそうだ、使用して並列処理を試してみましたが、シンクはsame.Each源である以下の例では、シミュレーションであるあなたがあるように、それらを考慮すべてのそのコードは同等のものを行うには、なぜ

object TestParallelGraph extends App { 

    implicit val system = ActorSystem("test") 
    implicit val dispacher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val listOfDifferentSource=List(1,2,3) //consider we have to read data from various sources 


def createGraph() = { 
    RunnableGraph.fromGraph(GraphDSL.create() { 
     implicit builder => 
     import GraphDSL.Implicits._ 

     val merge=builder.add(Merge[Int](listOfDifferentSource.length)) 

     val flow=builder.add(Flow[Int].map(_ + 10)) //just random flow to add 10 

     //again as mentioned above creating source with different information to simulate 
     Source(listOfDifferentSource.head*100 to 100* listOfDifferentSource.head+10) ~> merge ~> flow ~> Sink.foreach(println) 

     for{ 
      i <- listOfDifferentSource.tail //for each other source 
     }yield (Source(100*i to 100*i+10) ~> merge) 

     ClosedShape 
    }) 
    } 

    createGraph().run() 
} 
+0

これは並列ではなく、順番に実行されることに注意してください。リンク先のドキュメントで説明されているように、 '.async'を使って非同期境界をマークする必要があります。 – johanandren

+0

ありがとうジョン。私はkaka-stream-and-http-experimental 2.0.3を使用していますので、mapAsyncは注意が必要です。 – ASe

+0

私のコードはval flow = builder.add(Flow [Int] .mapAsync(parallelism = 10)(_ + 10))になります – ASe

関連する問題