2017-08-13 4 views
1

私は、ファイルをアップロードするためのアクターを使用している(2.4.2のakka 2.4.18)アプリケーションがあります。私は、階層akkaの俳優システムの外で応答を送信

のこの種の親スーパーバイザ俳優を持ってUploadSupervisor ---子供---> UploadActor ---子--->
DataWriteActor & MetaWriteActor

葉俳優MetaWriteActor & DataWriteActorが実際の書き込みを行います。私のコードの非常に単純化されたバージョンは以下の通りです:

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor") 
override def supervisorStrategy = OneForOneStrategy() { 
    case _: Throwable => Restart 
} 

override def receive: Receive = { 
    case data: Data => uploadActor ! data 
    case meta: MetaInfo => uploadActor ! meta 
    //How do I send response outside of actor system? 
    case dataSuccess: DataUploadResponse => ??? //Line 10 
    case metaSuccess: MetaUploadResponse => ??? //Line 11 

} 

object UploadSupervisor { 
    val uploadSupervisor = Akka.system 
    .actorOf(Props(new UploadSupervisor), "UploadSupervisor") 
} 
//Request & Response case classes 
case class Data(content: String) 
case class MetaInfo(id: String, createdDate: Timestamp) 

case class DataUploadResponse(location: String) 
case class MetaUploadResponse(location: String) 

UploadActor:

まず私は俳優の監督持つ -

class UploadActor extends Actor { 
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor") 
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor") 

override def receive = { 
case data: Data => dataWriteActor ! data 
case meta: MetaInfo => metaWriteActor ! meta 
case dataResp: DataUploadResponse => context.parent ! dataResp 
case metaResp: MetaUploadResponse => context.parent ! metaResp 

} 
} 

DataWriteActor:

class DataWriteActor extends Actor { 
    case data: Data => //Do the writing 
        println("data write completed") 
        sender() ! DataUploadResponse("someLocation") 

} 

MetaWriteActor

class MetaWriteActor extends Actor { 
    case meta: MetaInfo=> //Do the writing 
        println(" meta info writing completed") 
        sender() ! MetaUploadResponse("someOtherLocation") 

} 

どこかで監督システムの外部: -

implicit val timeout = Timeout(10 seconds) 
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]]) 

val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]]) 

//Do something with futures 

質問は俳優の系外応答を送信する方法ですか? 10行目で& 11で、私は送信者を使用できません!現在の送信者がUploadActorであるためです。 UploadSupervisorにメッセージを送信するには

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props[UploadActor], "UploadActor") 

    override val supervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 

    var dataSender: Option[ActorRef] = None 
    var metaSender: Option[ActorRef] = None 

    def receive = { 
    case data: Data => 
     val s = sender 
     dataSender = Option(s) 
     uploadActor ! data 
    case meta: MetaInfo => 
     val s = sender 
     metaSender = Option(s) 
     uploadActor ! meta 
    case dataSuccess: DataUploadResponse => 
     dataSender.foreach(_ ! dataSuccess) 
    case metaSuccess: MetaUploadResponse => 
     metaSender.foreach(_ ! metaSuccess) 
    } 
} 

:あなたが最初の送信者へUploadSupervisor参考文献に保つことができる

+0

「俳優システムの外で回答を送る方法は?」あなたは外のシステムで何を意味しますか? 1つの方法は、メッセージングミドルウェアを使用して、あなたの俳優からそこにメッセージを送信することです。消費者はそれを購読して消費します。 Akkaも同様のコンセプトを使用しますが、私が知っている限り、アクターだけに限定されています。 – Imran

+0

@Imran私はUploadSupervisor(10行目と11行目)から、私が使用しているメインスレッドまでを意味します。あなたは値を取得するために質問します – Aiden

+0

@Imran私が探検できるようなメッセージングミドルウェアを提案してもらえますか?また、これらのアクターのインスタンスを複数インスタンス化するつもりはないと仮定します。 (クラスからオブジェクトへ)。その後、私は 'UploadActor.dataWriteActor?Data(" Hello Akka ")'などと呼ぶことができます。 – Aiden

答えて

1

implicit val timeout = Timeout(10 seconds) 

val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse] 

val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse] 

上記は、あなたが1つのDataメッセージを送っていることを前提と1 MetaInfoメッセージをUploadSupervisorに一度に送信してください。この方法は、複数のDataMetaInfoのメッセージを送信し、同時に返信すると予想される場合には無効になります。

case class DataMsg(data: Data, target: ActorRef) 
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef) 

case class DataUploadMsg(response: DataUploadResponse, target: ActorRef) 
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef) 

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props[UploadActor], "UploadActor") 

    override val supervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 

    def receive = { 
    case data: Data => 
     val s = sender 
     uploadActor ! DataMsg(data, s) 
    case meta: MetaInfo => 
     val s = sender 
     uploadActor ! MetaInfoMsg(meta, s) 
    case DataUploadMsg(response, target) => 
     target ! response 
    case MetaUploadMsg(response, target) => 
     target ! response 
    } 
} 

UploadActor:より一般的なソリューションは、既存のケースクラスをラップ追加の場合クラス、あなたの俳優の階層を通じてこの参照を渡すことで、最初の送信者への言及を含むことである

class UploadActor extends Actor { 
    val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor") 
    val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor") 

    def receive = { 
    case data: DataMsg => dataWriteActor ! data 
    case meta: MetaInfoMsg => metaWriteActor ! meta 
    case dataResp: DataUploadMsg => context.parent ! dataResp 
    case metaResp: MetaUploadMsg => context.parent ! metaResp 
    } 
} 

作者:

class DataWriteActor extends Actor { 
    def receive = { 
    case DataMsg(data, target) => 
     // do the writing 
     println("data write completed") 
     sender ! DataUploadMsg(DataUploadResponse("someLocation"), target) 
    } 
} 

class MetaWriteActor extends Actor { 
    def receive = { 
    case MetaInfoMsg(meta, target) => 
     // do the writing 
     println("meta info writing completed") 
     sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target) 
    } 
} 
+0

2番目のアプローチをより好感しました。 。助けをありがとう。答えを受け入れる。 – Aiden

関連する問題