2017-09-14 2 views
0

Javaでカスタムソースを作成することを試しています。具体的には、BlockingQueueから要素を取るSourceを書きました。私はSource.queueを知っていますが、マージステージにそれらのいくつかを接続すると、マテリアライズされた値を取得する方法がわかりません。とにかく、ここでの実装があります:Akka Streams-マージステージでは、すべての上流ソースがプッシュされたときにのみ下流をプッシュすることがあります

public class TestingSource extends GraphStage<SourceShape<String>> { 
    private static final ExecutorService executor = Executors.newCachedThreadPool(); 

    public final Outlet<String> out = Outlet.create("TestingSource.out"); 
    private final SourceShape<String> shape = SourceShape.of(out); 

    private final BlockingQueue<String> queue; 
    private final String identifier; 

    public TestingSource(BlockingQueue<String> queue, String identifier) { 
     this.queue = queue; 
     this.identifier = identifier; 
    } 

    @Override 
    public SourceShape<String> shape() { 
     return shape; 
    } 

    @Override 
    public GraphStageLogic createLogic(Attributes inheritedAttributes) { 
     return new GraphStageLogic(shape()) { 
      private AsyncCallback<BlockingQueue<String>> callBack; 

      { 
       setHandler(out, new AbstractOutHandler() { 
        @Override 
        public void onPull() throws Exception { 
         String string = queue.poll(); 
         if (string == null) { 
          System.out.println("TestingSource " + identifier + " no records in queue, invoking callback"); 
          executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream 
         } else { 
          System.out.println("TestingSource " + identifier + " found record during pull, pushing"); 
          push(out, string); 
         } 
        } 
       }); 
      } 

      @Override 
      public void preStart() { 
       callBack = createAsyncCallback(queue -> { 
        String string = null; 
        while (string == null) { 
         Thread.sleep(100); 
         string = queue.poll(); 
        } 
        push(out, string); 
        System.out.println("TestingSource " + identifier + " found record during callback, pushed"); 
       }); 
      } 
     }; 
    } 
} 

この例では動作しますので、私のソースが正常に動作しているようだ:

private static void simpleStream() throws InterruptedException { 
    BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 
    Source.fromGraph(new TestingSource(queue, "source")) 
      .to(Sink.foreach(record -> System.out.println(record))) 
      .run(materializer); 

    Thread.sleep(2500); 
    queue.add("first"); 

    Thread.sleep(2500); 
    queue.add("second"); 
} 

私もマージステージにソースの2を接続する例を書きました:

private static void simpleMerge() throws InterruptedException { 
    BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); 
    BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(); 

    final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
      Sink.foreach(record -> System.out.println(record)), 
      (builder, out) -> { 
       final UniformFanInShape<String, String> merge = 
         builder.add(Merge.create(2)); 
       builder.from(builder.add(new TestingSource(queue1, "queue1"))) 
         .toInlet(merge.in(0)); 
       builder.from(builder.add(new TestingSource(queue2, "queue2"))) 
         .toInlet(merge.in(1)); 

       builder.from(merge.out()) 
         .to(out); 
       return ClosedShape.getInstance(); 
      })); 
    result.run(materializer); 

    Thread.sleep(2500); 
    System.out.println("seeding first queue"); 
    queue1.add("first"); 

    Thread.sleep(2500); 
    System.out.println("seeding second queue"); 
    queue2.add("second"); 
} 

私はそれが別の5秒後に「第二の」5秒後に「第1」、その後、印刷物を印刷しexpect-として時には、この例では動作します。

しかし、時には(約1回5回)、10秒後に "second"が印刷され、すぐに "first"が印刷されます。つまり、マージステージは、両方のソースが何かをプッシュした場合にのみ、ストリームをプッシュダウンします。 フル出力は次のようになります。

TestingSource queue1 no records in queue, invoking callback 
TestingSource queue2 no records in queue, invoking callback 
seeding first queue 
seeding second queue 
TestingSource queue2 found record during callback, pushed 
second 
TestingSource queue2 no records in queue, invoking callback 
TestingSource queue1 found record during callback, pushed 
first 
TestingSource queue1 no records in queue, invoking callback 

この現象はMergePreferredとMergePrioritizedでより頻繁に起こります。

私の質問は、これはMergeの正しい動作ですか?そうでない場合、私は何を間違っているのですか?

答えて

0

一見すると、ステージの真ん中でThread.sleepのスレッドをブロックすることは、少なくとも問題の1つと考えられます。

とにかく、質問の冒頭に記載されているように、Source.queueを使用する方が簡単だと思います。問題がGraphDSLを使用した場合、マテリアライズド値を抽出することであるならば、ここであなたがそれを行う方法は次のとおりです。docsでこの上

final Source<String, SourceQueueWithComplete<String>> source = Source.queue(100, OverflowStrategy.backpressure()); 
    final Sink<Object, CompletionStage<akka.Done>> sink = Sink.ignore(); 

    final RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<akka.Done>>> g = 
      RunnableGraph.fromGraph(
        GraphDSL.create(
          source, 
          sink, 
          Keep.both(), 
          (b, src, snk) -> { 
           b.from(src).to(snk); 
           return ClosedShape.getInstance(); 
          } 
        ) 
      ); 

    g.run(materializer); // this gives you back the queue 

詳細情報。

+0

ありがとうございました。任意の量のソースを使用したい場合(例えば、私は 'List 'を持っています)、それらをマージステージに接続したいのですが?どうすればそれらのキューをすべて取得できますか?また、 'Thread.sleep'がメインスレッドにありますが、なぜストリームに影響を与えますか? – akir94

+0

任意の量のソースをマージするには、MergeHubを調べます。ここのドキュメントhttp://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub。あなたのコードのブロックビットについては、 'prestart'関数の一部として' Thread.sleep'を取得し、 'onPull'コールバックで' queue.poll'を取得しました。これらはすべてブロックされており、専用のディスパッチャで実行する場合を除いて、グラフステージ内で呼び出されるべきではありません。詳細はこちらhttp://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –

+0

ありがとう、私は完全に 'MergeHub'を逃した。もう1つの質問 - 私は 'MergePrioritized'に似た機能を望んでいます。ここでは' Source'の優先順位が異なります。 MergeHubでこれを達成する正しい方法は何ですか?ドキュメントはそれをカバーしていないようです。 – akir94

関連する問題