最初の項目の後にAkka-Stream(今日の2.11_2.5-SNAPSHOT)Source
を取り消す予定の次の(簡略化された)コンシューマがありますが、onNext
はまだ4回:その要求を考えるとAkka-Stream Sourceを熱心に停止します
static Subscriber<Object> println() {
return new Subscriber<Object>() {
Subscription s;
int n;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(5);
}
@Override
public void onNext(Object t) {
System.out.println(Thread.currentThread().getName()
+ ": " + t + " - " + (++n));
if (s != null) {
s.cancel();
s = null;
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + ": DONE");
}
};
}
public static void main(String[] args) throws Exception {
Config cfg = ConfigFactory.parseResources(
AkkaRange.class, "/akka-streams.conf").resolve();
ActorSystem actorSystem = ActorSystem.create("sys", cfg);
ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
Source<Integer, NotUsed> source = Source.repeat(1);
Publisher<Integer> p = source.runWith(Sink.asPublisher(
AsPublisher.WITH_FANOUT), materializer);
p.subscribe(println());
Thread.sleep(1000);
actorSystem.terminate();
}
はまだわずか4コールが行われた5ですが、私は基本的なメッセージングアーキテクチャは、キャンセル(またはさらに要求)メッセージのメッセージキューをチェックする前に、4-バッチで要求に応答すると仮定します。
キャンセルをより積極的に行うための設定はありますか?
ユースケース1-2ソース要素および下流は、この場合ストリームを解除した後、所望の結果を生成することができる計算集約ステージ(マップ)がある相互運用計算のようなものです。問題は、この4バッチのために残りの2〜3の要素についても計算が実行されることです。
これは公平性の問題です。 'map'で長時間実行している計算をすると、いくつかのシグナルが熱心に登録されるのを妨げる可能性があります。 1) 'mapAsync'を代わりに使い、長時間の計算をオフサイドで実行するか、2)' actor.stream.materializer.sync-processing-limit'を低くして、外部信号がより速く処理されるようにする(スループットに不利になります、しかし)。 – jrudolph
ありがとう、彼らは合理的なオプションのようです。スループットを必要とする他のAkkaストリームがないので、オプション2を使用します。回答としてあなたのコメントを投稿できますか? – akarnokd