私はakkaストリームを試していますが、私の単純な例では背圧を得ることができません。私はakka(ストリーム)の経験がないので、おそらく私は大きなものを見逃しています。akkaストリームのバックプレッシャを理解するSource.queue
私はそれらを消費するよりも速く整数を生成しているので、バックプレッシャーが入り込むと思いました。 私の目標は、キューに入れられた最新のアイテムを常に消費することです。 bufferSize = 1および OverflowStrategy.dropHead()(ソースキュー上)。
public class SimpleStream {
public static void main(String[] argv) throws InterruptedException {
final ActorSystem system = ActorSystem.create("akka-streams");
final Materializer materializer = ActorMaterializer.create(system);
final Procedure<Integer> slowConsumer = (i) -> {
System.out.println("consuming [" + i + "]");
ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS);
};
final SourceQueue<Integer> q = Sink
.<Integer>foreach(slowConsumer)
.runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer);
final AtomicInteger i = new AtomicInteger(0);
final Thread t = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
int n = i.incrementAndGet();
q.offer(n);
System.out.println("produced: [" + n + "]");
ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS);
}
});
t.setName("ticking");
t.start();
// run some time... to observe the effects.
ThreadUtils.sleepQuietly(1, TimeUnit.HOURS);
t.interrupt();
t.join();
// eventually shutdown akka here...
}
}
しかし、これが結果です:
produced: [1]
consuming [1]
produced: [2]
produced: [3]
consuming [2] <-- Expected to be consuming 3 here.
produced: [4]
produced: [5]
consuming [3] <-- Expected to be consuming 5 here.
produced: [6]
produced: [7]
私はこれを使用していた場合、それが起こるように(単に外部 源から偽の取得データにここにスレッドのものを無視して、そこにしてください実際のプロジェクト)。
私が逃しているものは何ですか?
バックプレッシャーは 'Source.queue'では機能しません。できるだけ多くの時間、その「オファー」を呼び出すことができます。あなたは 'オファー'が返すものをチェックする必要があります。プロデューサーを消費者キューから独立させたいと思う可能性が最も高いです。 'MergeHub'を見てください。おそらくそれはあなたのためにうまくいくでしょう。 – expert