subscribe()
を呼び出す前に、リアクターでは、Subscription
を取り消したいという意味はありませんメソッドを使用してSubscription
を作成し、チェーンの上にその信号を伝播してデータの放出を開始します)。
すべてのサブスクリプションを持つ集中化された場所はありません。キャンセルする特定のサブスクリプションを見つける方法が必要なのであまり意味がありません(チェーン内の各オペレータが中間サブスクリプションも...)。
一部の事業者は、あなたの代わりにサブスクリプションをキャンセルすることがあります。
Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);
意志出力:
14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel()
をああ、私はその演算子については知りませんでした:それは十分なアイテムが放出された後、上流キャンセルさせていただきます例えば
take(int)
用ケース、ある Oしかし、私のために最も重要なことは、CancellationオブジェクトがOnCancel信号を適切に処理できる信号を送信していたことでした。しかし、問題は回避策を見つけることができます。Fluxに例外を投げてストリームをキャンセルするので、悪いことはありません。 – Kapitalny'Cancellation'オブジェクトの使用をお勧めします。 3.1では 'Disposable'になります(その時点で' cancel() 'ではなく' dispose() 'を呼び出さなければなりません)。あなたがしたいことに自然にマッチする演算子を探したり、必要に応じてキャンセルしたりすることができます。流用に例外を投げることは、ユースケースに応じて、あまりにも良い解決法とは言えません。 –
@SimonBasléあなたのコードを実行し、 'take(2)'が 'log()'の前にある場合、 'cancel()'シグナルは出力されません。どうして? 'take '演算子は、ソースではなく、フラックスをキャンセルすると言いました。逆に –