私はAkka Stream 2.5.5を使用しています。私はStream
を持っています:Stream加入者はいつサブスクリプションをキャンセルしますか?
- アクターがストリームのソースとして機能しています。私はこれを使用して:
Source.actorPublisher
- ストリームには複数のステージがあります。
map
、collect
などの場合は、発生する可能性のある例外を処理するためにRecoverを使用しています。私はストリームを実行すると.withAttributes(supervisionStrategy(resumingDecider)))
、私は私のソースの俳優に伝播akka.stream.actor.ActorPublisherMessage.Cancel
を取得しています:mapAsync()からエラーを処理するため
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
* subscription.
*/
final case object Cancel extends Cancel with NoSerializationVerificationNeeded
sealed abstract class Cancel extends ActorPublisherMessage
驚くべきことに、ストリーム内の任意のstage
でスローされる任意のException
はありません。だから私は理解できていない。why the stream subscriber cancels the subscription
。したがって、私のストリームが失敗している理由について、正確なCause
またはError
を見つけることができません。
このシナリオに関する洞察と推論は非常に役立ちます。
「シンク」以外はすべて説明しました。ストリームのコードも見せてくれれば助けが簡単です。 –