2017-05-28 12 views
2

私はakkaストリームを試していますが、私の単純な例では背圧を得ることができません。私はakka(ストリーム)の経験がないので、おそらく私は大きなものを見逃しています。akkaストリームのバックプレッシャを理解するSource.queue

私はそれらを消費するよりも速く整数を生成しているので、バックプレッシャーが入り込むと思いました。 私の目標は、キューに入れられた最新のアイテムを常に消費することです。 bufferSize = 1および OverflowStrategy.dropHead()(ソースキュー上)。

public class SimpleStream { 
    public static void main(String[] argv) throws InterruptedException { 
     final ActorSystem system = ActorSystem.create("akka-streams"); 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Procedure<Integer> slowConsumer = (i) -> { 
      System.out.println("consuming [" + i + "]"); 
      ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS); 
     }; 

     final SourceQueue<Integer> q = Sink 
       .<Integer>foreach(slowConsumer) 
       .runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer); 

     final AtomicInteger i = new AtomicInteger(0); 
     final Thread t = new Thread(() -> { 
      while (!Thread.currentThread().isInterrupted()) { 
       int n = i.incrementAndGet(); 
       q.offer(n); 
       System.out.println("produced: [" + n + "]"); 
       ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS); 
      } 
     }); 
     t.setName("ticking"); 
     t.start(); 

     // run some time... to observe the effects. 
     ThreadUtils.sleepQuietly(1, TimeUnit.HOURS); 
     t.interrupt(); 
     t.join(); 

     // eventually shutdown akka here... 
    } 
} 

しかし、これが結果です:

produced: [1] 
consuming [1] 
produced: [2] 
produced: [3] 
consuming [2] <-- Expected to be consuming 3 here. 
produced: [4] 
produced: [5] 
consuming [3] <-- Expected to be consuming 5 here. 
produced: [6] 
produced: [7] 

私はこれを使用していた場合、それが起こるように(単に外部 源から偽の取得データにここにスレッドのものを無視して、そこにしてください実際のプロジェクト)。

私が逃しているものは何ですか?

+0

バックプレッシャーは 'Source.queue'では機能しません。できるだけ多くの時間、その「オファー」を呼び出すことができます。あなたは 'オファー'が返すものをチェックする必要があります。プロデューサーを消費者キューから独立させたいと思う可能性が最も高いです。 'MergeHub'を見てください。おそらくそれはあなたのためにうまくいくでしょう。 – expert

答えて

0

Source.queueは、バックプレッシャーのシグナリングを終了します。 Source.queueメソッドにはOverflowStrategyが含まれています。バックプレッシャーがキューを通過して上流側に伝えられる場合、キューがオーバーフローする可能性がある状況に対処する必要はありません。しかし、バックプレッシャがキューを超えて伝播しないので、コンシューマよりも速いプロデューサを処理するための戦略を定義する必要があります。

典型的なストリームの場合、究極のSourceSinkからの要求を受けて、さらに多くの結果を得ます。しかし、Source.queueから作成されたストリームでは、「究極のソース」はキューです。このキューはコンテンツがある場合にのみ排除することができます。アップストリームはofferメソッドの反対側にあるため、より多くの結果を生成するためにアップストリームに信号を送ることはできません。

+0

「バックプレッシャー」をキューの上方に伝播させることはできません。私の質問では、キューでオーバーフロー戦略を実施する "背圧"と呼んでいます。名前にもかかわらず、そのオーバーフロー戦略が機能していれば(つまり背圧でもなくても)うまくいくでしょう。 –

関連する問題