2017-11-24 5 views
1

私はAkka Stream 2.5.5を使用しています。私はStreamを持っています:Stream加入者はいつサブスクリプションをキャンセルしますか?

  • アクターがストリームのソースとして機能しています。私はこれを使用して:Source.actorPublisher
  • ストリームには複数のステージがあります。 mapcollectなどの場合は、発生する可能性のある例外を処理するためにRecoverを使用しています。私はストリームを実行すると.withAttributes(supervisionStrategy(resumingDecider)))

、私は私のソースの俳優に伝播akka.stream.actor.ActorPublisherMessage.Cancelを取得しています:mapAsync()からエラーを処理するため

  • 、私は監督の戦略を使用しています。ドキュメントから:

    /** 
        * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the 
        * subscription. 
        */ 
        final case object Cancel extends Cancel with NoSerializationVerificationNeeded 
        sealed abstract class Cancel extends ActorPublisherMessage 
    

    驚くべきことに、ストリーム内の任意のstageでスローされる任意のExceptionはありません。だから私は理解できていない。why the stream subscriber cancels the subscription。したがって、私のストリームが失敗している理由について、正確なCauseまたはErrorを見つけることができません。

    このシナリオに関する洞察と推論は非常に役立ちます。

  • +0

    「シンク」以外はすべて説明しました。ストリームのコードも見せてくれれば助けが簡単です。 –

    答えて

    0

    私は最終的に私のストリームの最後の段階としてrecover stageを使用して原因を見つけました。以前は、ストリームの各段階の後でエラーをログに記録し、最後に目的の値を単純に渡す段階の後に複数の段を複数使用していました。

    は今、私は以下のように見える私のストリームの最後に一つだけrecover stageを使用します。これを追加した後

    .recover { 
          case e => 
          println("****************************") 
          println("Throwing Exception. Cause: "+e.getMessage) 
          println("****************************") 
          throw e 
         } 
    

    、実際の例外がスローされたと私は、実際の原因を発見しました。私の場合、ステージの1つに例外を投げたコードがありました。

    関連する問題