2017-05-18 8 views
0

最初の項目の後にAkka-Stream(今日の2.11_2.5-SNAPSHOT)Sourceを取り消す予定の次の(簡略化された)コンシューマがありますが、onNextはまだ4回:その要求を考えるとAkka-Stream Sourceを熱心に停止します

static Subscriber<Object> println() { 
    return new Subscriber<Object>() { 

     Subscription s; 

     int n; 

     @Override 
     public void onSubscribe(Subscription s) { 
      this.s = s; 
      s.request(5); 
     } 

     @Override 
     public void onNext(Object t) { 
      System.out.println(Thread.currentThread().getName() 
        + ": " + t + " - " + (++n)); 
      if (s != null) { 
       s.cancel(); 
       s = null; 
      } 
     } 

     @Override 
     public void onError(Throwable t) { 
      t.printStackTrace(); 
     } 

     @Override 
     public void onComplete() { 
      System.out.println(Thread.currentThread().getName() + ": DONE"); 
     } 
    }; 
} 

public static void main(String[] args) throws Exception { 
    Config cfg = ConfigFactory.parseResources(
     AkkaRange.class, "/akka-streams.conf").resolve(); 
    ActorSystem actorSystem = ActorSystem.create("sys", cfg); 

    ActorMaterializer materializer = ActorMaterializer.create(actorSystem); 

    Source<Integer, NotUsed> source = Source.repeat(1); 

    Publisher<Integer> p = source.runWith(Sink.asPublisher(
     AsPublisher.WITH_FANOUT), materializer); 

    p.subscribe(println()); 

    Thread.sleep(1000); 

    actorSystem.terminate(); 
} 

はまだわずか4コールが行われた5ですが、私は基本的なメッセージングアーキテクチャは、キャンセル(またはさらに要求)メッセージのメッセージキューをチェックする前に、4-バッチで要求に応答すると仮定します。

キャンセルをより積極的に行うための設定はありますか?

ユースケース1-2ソース要素および下流は、この場合ストリームを解除した後、所望の結果を生成することができる計算集約ステージ(マップ)がある相互運用計算のようなものです。問題は、この4バッチのために残りの2〜3の要素についても計算が実行されることです。

+0

これは公平性の問題です。 'map'で長時間実行している計算をすると、いくつかのシグナルが熱心に登録されるのを妨げる可能性があります。 1) 'mapAsync'を代わりに使い、長時間の計算をオフサイドで実行するか、2)' actor.stream.materializer.sync-processing-limit'を低くして、外部信号がより速く処理されるようにする(スループットに不利になります、しかし)。 – jrudolph

+0

ありがとう、彼らは合理的なオプションのようです。スループットを必要とする他のAkkaストリームがないので、オプション2を使用します。回答としてあなたのコメントを投稿できますか? – akarnokd

答えて

0

Subscriberインターフェイスはa part ofです。リアクティブストリーム仕様です。これには、多くのライブラリが含まれており、akka-streamsが実装されています。そして、この仕様の状態以下のもの:がある場合、加入者がSubscription.cancel(呼ばれた後、一つ以上のonNext信号を受信するように準備しなければなりません

)がまだ保留中の要素を[3.12を参照してください]要求しました。 Subscription.cancel()は、基本となるクリーニング操作をすぐに実行することを保証しません。

したがって、加入者でこの状況を手動で処理する必要があります。そうしないと、仕様に違反し、仕様を実装するライブラリで使用できなくなります。

+0

私は手紙のスペックを知っていて、それが私の加入者だけだったならば、私はさらに次のものを無視したいと思います。問題は、1回ではなく4回実行されるplain map()操作で、Akka Stream側で重い計算が実行されることです。私はmap()の関数の実装を制御できません。 Rxのように取り消しに熱心でも、仕様に違反していません。 – akarnokd

+0

@akarnokdあなたのサブスクライバで 'request(5)'を発行してから 'cancel'を発行しましたか? –

関連する問題