2017-02-23 8 views
1

これは、GraphDSL APIを使用した、本当にシンプルで初心者の質問です。私は、スレッドSO関連するいくつかを読んで、私は答えが表示されない:Akka Streams:GraphDSL APIからMaterialized Sink出力を取得するにはどうすればよいですか?

val actorSystem = ActorSystem("QuickStart") 
val executor = actorSystem.dispatcher 
val materializer = ActorMaterializer()(actorSystem) 

val source: Source[Int, NotUsed] = Source(1 to 5) 
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping) 
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2) 
val sink = Sink.foreach(println) 

val graphModel = GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    // I presume I want to change this shape to something else 
    // but I can't figure out what it is. 
    ClosedShape 
} 
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the 
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape 
// but I can't get that working. 
val graph = RunnableGraph.fromGraph(graphModel) 

// This works and gives me the materialized sink output using the simpler API. 
// But I want to use the GraphDSL so that I can add branches or junctures. 
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right) 

答えて

4

トリックは、GraphDSL.createに(あなたのケースでは、sink)のマテリアライズされた値の値段を渡すことです。 2番目のパラメータとして渡す関数も同様に変更され、グラフに使用できるShape入力パラメータ(下の例ではs)が必要です。

val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> s 

    // ClosedShape is just fine - it is always the shape of a RunnableGraph 
    ClosedShape 
    } 
    val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel) 

さらに詳しい情報はdocsにあります。

+0

私はあなたの文書参照のためにアップしました;-) –

+1

ああ、フィニッシュラインに殴ら;)よく遊ん –

+0

あなたのおかげで。 GraphDSL.create(sink)呼び出しにシンクを追加すると、ClosedShapeはコンパイラエラーを取得します。それをどのように更新するのですか? – clay

3
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    ClosedShape 
} 
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)  
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right) 

GraphDSLのAPIに問題が暗黙のビルダーが頻繁に過負荷になっていることを、です。 createにシンクをラップする必要があります。Builder[NotUsed]Builder[Future[Done]]に変換し、今度はbuilder => shapeの代わりにbuilder => sink => shapeの機能を表します。

+0

ありがとうございます。シンクパラメータを 'GraphDSL.create'に追加すると' ClosedShape'行に新しいコンパイラエラーが発生します。どのように私はそれを更新する任意のアイデア? – clay

+0

申し訳ありません、他の回答:-) –

関連する問題