Javaでカスタムソースを作成することを試しています。具体的には、BlockingQueueから要素を取るSourceを書きました。私はSource.queueを知っていますが、マージステージにそれらのいくつかを接続すると、マテリアライズされた値を取得する方法がわかりません。とにかく、ここでの実装があります:Akka Streams-マージステージでは、すべての上流ソースがプッシュされたときにのみ下流をプッシュすることがあります
public class TestingSource extends GraphStage<SourceShape<String>> {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public final Outlet<String> out = Outlet.create("TestingSource.out");
private final SourceShape<String> shape = SourceShape.of(out);
private final BlockingQueue<String> queue;
private final String identifier;
public TestingSource(BlockingQueue<String> queue, String identifier) {
this.queue = queue;
this.identifier = identifier;
}
@Override
public SourceShape<String> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape()) {
private AsyncCallback<BlockingQueue<String>> callBack;
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
String string = queue.poll();
if (string == null) {
System.out.println("TestingSource " + identifier + " no records in queue, invoking callback");
executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream
} else {
System.out.println("TestingSource " + identifier + " found record during pull, pushing");
push(out, string);
}
}
});
}
@Override
public void preStart() {
callBack = createAsyncCallback(queue -> {
String string = null;
while (string == null) {
Thread.sleep(100);
string = queue.poll();
}
push(out, string);
System.out.println("TestingSource " + identifier + " found record during callback, pushed");
});
}
};
}
}
この例では動作しますので、私のソースが正常に動作しているようだ:
private static void simpleStream() throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Source.fromGraph(new TestingSource(queue, "source"))
.to(Sink.foreach(record -> System.out.println(record)))
.run(materializer);
Thread.sleep(2500);
queue.add("first");
Thread.sleep(2500);
queue.add("second");
}
私もマージステージにソースの2を接続する例を書きました:
private static void simpleMerge() throws InterruptedException {
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>();
final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
Sink.foreach(record -> System.out.println(record)),
(builder, out) -> {
final UniformFanInShape<String, String> merge =
builder.add(Merge.create(2));
builder.from(builder.add(new TestingSource(queue1, "queue1")))
.toInlet(merge.in(0));
builder.from(builder.add(new TestingSource(queue2, "queue2")))
.toInlet(merge.in(1));
builder.from(merge.out())
.to(out);
return ClosedShape.getInstance();
}));
result.run(materializer);
Thread.sleep(2500);
System.out.println("seeding first queue");
queue1.add("first");
Thread.sleep(2500);
System.out.println("seeding second queue");
queue2.add("second");
}
私はそれが別の5秒後に「第二の」5秒後に「第1」、その後、印刷物を印刷しexpect-として時には、この例では動作します。
しかし、時には(約1回5回)、10秒後に "second"が印刷され、すぐに "first"が印刷されます。つまり、マージステージは、両方のソースが何かをプッシュした場合にのみ、ストリームをプッシュダウンします。 フル出力は次のようになります。
TestingSource queue1 no records in queue, invoking callback
TestingSource queue2 no records in queue, invoking callback
seeding first queue
seeding second queue
TestingSource queue2 found record during callback, pushed
second
TestingSource queue2 no records in queue, invoking callback
TestingSource queue1 found record during callback, pushed
first
TestingSource queue1 no records in queue, invoking callback
この現象はMergePreferredとMergePrioritizedでより頻繁に起こります。
私の質問は、これはMergeの正しい動作ですか?そうでない場合、私は何を間違っているのですか?
ありがとうございました。任意の量のソースを使用したい場合(例えば、私は 'List'を持っています)、それらをマージステージに接続したいのですが?どうすればそれらのキューをすべて取得できますか?また、 'Thread.sleep'がメインスレッドにありますが、なぜストリームに影響を与えますか? –
akir94
任意の量のソースをマージするには、MergeHubを調べます。ここのドキュメントhttp://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub。あなたのコードのブロックビットについては、 'prestart'関数の一部として' Thread.sleep'を取得し、 'onPull'コールバックで' queue.poll'を取得しました。これらはすべてブロックされており、専用のディスパッチャで実行する場合を除いて、グラフステージ内で呼び出されるべきではありません。詳細はこちらhttp://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –
ありがとう、私は完全に 'MergeHub'を逃した。もう1つの質問 - 私は 'MergePrioritized'に似た機能を望んでいます。ここでは' Source'の優先順位が異なります。 MergeHubでこれを達成する正しい方法は何ですか?ドキュメントはそれをカバーしていないようです。 – akir94