2017-07-25 11 views
1

akkafuturesで障害処理を理解しようとしました。 たとえば、私は親と子のアクターを持っています。将来、親アクターにエラーを伝播する方法

チャイルドアクターは、2つの障害の場合がありますメッセージ処理
ケース2)エラーが私は両方の場合に親に誤りを伝播するが、中に必要将来

内部に発生している間
ケース1)エラーが発生し2番目のケースは起こりません。私は間違って何をしていますか?

import akka.actor.SupervisorStrategy.{Decider, Stop} 
import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy} 
import akka.testkit.{TestKit, TestProbe} 
import org.junit.{After, Before, Test} 

import scala.concurrent.Future 
import scala.util.{Failure, Success} 


class Parent(_case: String, probe: ActorRef) extends Actor { 

    val child = context.actorOf(Props(new Child(_case)), "myLittleChild") 

    final val defaultStrategy: SupervisorStrategy = { 
    def defaultDecider: Decider = { 
     case ex: Exception => 
     probe ! ex 
     Stop 
    } 

    OneForOneStrategy()(defaultDecider) 
    } 

    override def supervisorStrategy: SupervisorStrategy = defaultStrategy 

    override def receive: Receive = { 
    case msg => unhandled(msg) 
    } 

} 

class Child(_case: String) extends Actor { 

    implicit val ec = context.dispatcher 

    override def preStart(): Unit = { 
    self ! _case 
    } 


    override def receive: Receive = { 
    case "case1" => throw new RuntimeException("fail") 
    case "case2" => Future[String] { 
     throw new RuntimeException("fail") 
    }.onComplete { 
     case Success(s) => println(s) 
     case Failure(e) => 
     throw e 
    } 
    case msg => unhandled(msg) 
    } 
} 


class TestExample { 

    protected implicit var system: ActorSystem = _ 

    @Before 
    def setup(): Unit = { 
    system = ActorSystem.create("test") 
    } 

    @After 
    def tearDown(): Unit = { 
    TestKit.shutdownActorSystem(system) 
    } 

    @Test 
    def case1(): Unit = { 
    val testProbe = TestProbe() 
    system.actorOf(Props(new Parent("case1", testProbe.ref))) 
    testProbe expectMsgClass classOf[RuntimeException] 
    } 

    @Test 
    def case2(): Unit = { 
    val testProbe = TestProbe() 
    system.actorOf(Props(new Parent("case2", testProbe.ref))) 
    testProbe expectMsgClass classOf[RuntimeException] 
    } 

} 

答えて

0

あなたが俳優に例外を送信し、onCompleteコールバックの外からの例外を再スロー可能性があり、あなたのテストに合格するために取得するには:

override def receive: Receive = { 
    case "case1" => throw new RuntimeException("fail") 
    case "case2" => 
    Future[String] { 
     throw new RuntimeException("fail") 
    }.onComplete { 
     case Success(s) => println(s) 
     case Failure(e) => 
     self ! e 
    } 
    case e: RuntimeException => throw e 
    case msg => unhandled(msg) 
} 

しかし、あなたが俳優でFutureを使用しなければならない場合(例えば、メソッドがFutureを返す第三者のライブラリ)、例外を処理するより良い方法があります。たとえば、コメントで述べたデータベースAPI(databaseApi.load(): Future[Rows])を使用すると、親アクターはLoadDbメッセージを子に送信し、子はRowsまたは親にエラーメッセージを返す可能性があります。子供の行動には、次のようになります。

def receive = { 
    case LoadDb => 
    val s = sender // capture the sender 
    databaseApi 
     .load 
     .onComplete { 
     case Success(rows) => 
      s ! rows 
     case Failure(e) => 
      s ! DbFailure(e) 
     } 
    case ... 
} 

重要な注意点は、子供が内側から正しい送信元への参照を持っているために、LoadDbメッセージを受信したとき、我々はsender参照のローカルコピーを作成することですonCompleteコールバック。コールバック内でsenderを呼び出すと、senderがコールバックの実行時に変更されている可能性があります(here)。 (senderとは異なり、selfは不変であるので、それはonCompleteselfを使用しても安全です。)

+0

完全なテストコードで更新しました。 case2は失敗します... – zella

+0

@zella:更新されました。 – chunjef

1

これは親と子の間で通信する方法ではありません。 適切な方法は、失敗を含むメッセージを定義することです(例外を送信しないでください)。 次に、親はメッセージを適切に処理できます。

また、親で行う子アクターを構築することは、アクターをテストすることが非常に困難になるため好ましくありません。代わりに、子アクターファクトリ関数を引数として親アクターに渡す必要があります。これは、親アクターをテストするとき、ダミーアクター(例えば、TestActorRefまたはTestProbe)に簡単に置き換えることができます。同様に、子アクターは、適切なメッセージを親に返すために単独でテストできます。

また、俳優での「未来」の使用はお勧めしません。アクターはすでに非同期で実行されており、その時点で1つのメッセージしか処理していません。 Futureの俳優での使用を開始するとき、未来が完了するまで、俳優が間違った状態にある可能性があるので、未来がまだ完了していない間に他のメッセージを受け取るケースを処理する必要があります。フューチャーを俳優で使う方法は、book 'Effective Akka'(余分なパターン、カメオパターン)に記載されているように、一時的な俳優を使うことです。

'Effective Akka' bookは、Akkaで始まるときに読みやすいです。それには、避けるべきベストプラクティスと事柄が含まれています。とても読みやすい小さな本です。コメントをもとに

更新:あなたが俳優でそれを同期させるために決めることができました非同期(async)の俳優がすでに実行されているので、


  • あなたは2つのオプションを持っている場合。それははるかに簡単になりますが、ブロックする必要があります。
  • その他の解決策は、未来のonCompleteを処理し、親(または結果に関心のある俳優)に成功または失敗のメッセージを送信することです。個人的に私は例外をスローしません。

    親アクターに子アクターファクトリーを渡し、子(または作業者)アクターには、応答を望むアクターを渡すか、「送信者」から取得します。

    未来を呼び出す前に、「送信者」を捕捉する必要があることに注意してください。また、別のスレッドプールを使用すると、Futureのアクター自体と同じスレッドプールが使用されます。このプールを再利用するには、これは子供の俳優に渡したいものでもあり、テストのために調整することもできます。

アクターを再起動する理由がわかりません。とにかく、これはステートレスの俳優になると思われます。

子アクターの実装では、余分な/カメオのパターンを使用することが考えられます。その後、他のメッセージを受け取っていないと確信しています(終了時に俳優を止めることを忘れないでください)。しかし、それを別のアクターにすることによって、最終的に、これらのアクターのプールを作成して(ルーターを使用して)、同時のdbアクションの数を把握することができます。

+0

ありがとうございました。そして、私が先物を持ったAPIを既に持っているとしたら、それを使っている 'databaseApi.load():Future [Rows]'や 'ChildDatabaseActor'のようなものです。いくつかのdbの失敗で私はアクターを再起動したい - これは私のサンプルコードで似ています。 – zella

+0

更新された回答を見る –

関連する問題