私は、ファイルをアップロードするためのアクターを使用している(2.4.2のakka 2.4.18)アプリケーションがあります。私は、階層akkaの俳優システムの外で応答を送信
のこの種の親スーパーバイザ俳優を持ってUploadSupervisor ---子供---> UploadActor ---子--->
DataWriteActor & MetaWriteActor
葉俳優MetaWriteActor & DataWriteActorが実際の書き込みを行います。私のコードの非常に単純化されたバージョンは以下の通りです:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor")
override def supervisorStrategy = OneForOneStrategy() {
case _: Throwable => Restart
}
override def receive: Receive = {
case data: Data => uploadActor ! data
case meta: MetaInfo => uploadActor ! meta
//How do I send response outside of actor system?
case dataSuccess: DataUploadResponse => ??? //Line 10
case metaSuccess: MetaUploadResponse => ??? //Line 11
}
object UploadSupervisor {
val uploadSupervisor = Akka.system
.actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)
case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)
UploadActor:
まず私は俳優の監督持つ -
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")
override def receive = {
case data: Data => dataWriteActor ! data
case meta: MetaInfo => metaWriteActor ! meta
case dataResp: DataUploadResponse => context.parent ! dataResp
case metaResp: MetaUploadResponse => context.parent ! metaResp
}
}
DataWriteActor:
class DataWriteActor extends Actor {
case data: Data => //Do the writing
println("data write completed")
sender() ! DataUploadResponse("someLocation")
}
MetaWriteActor
をclass MetaWriteActor extends Actor {
case meta: MetaInfo=> //Do the writing
println(" meta info writing completed")
sender() ! MetaUploadResponse("someOtherLocation")
}
どこかで監督システムの外部: -
implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])
val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])
//Do something with futures
質問は俳優の系外応答を送信する方法ですか? 10行目で& 11で、私は送信者を使用できません!現在の送信者がUploadActorであるためです。 UploadSupervisor
にメッセージを送信するには
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
var dataSender: Option[ActorRef] = None
var metaSender: Option[ActorRef] = None
def receive = {
case data: Data =>
val s = sender
dataSender = Option(s)
uploadActor ! data
case meta: MetaInfo =>
val s = sender
metaSender = Option(s)
uploadActor ! meta
case dataSuccess: DataUploadResponse =>
dataSender.foreach(_ ! dataSuccess)
case metaSuccess: MetaUploadResponse =>
metaSender.foreach(_ ! metaSuccess)
}
}
:あなたが最初の送信者へUploadSupervisor
参考文献に保つことができる
「俳優システムの外で回答を送る方法は?」あなたは外のシステムで何を意味しますか? 1つの方法は、メッセージングミドルウェアを使用して、あなたの俳優からそこにメッセージを送信することです。消費者はそれを購読して消費します。 Akkaも同様のコンセプトを使用しますが、私が知っている限り、アクターだけに限定されています。 – Imran
@Imran私はUploadSupervisor(10行目と11行目)から、私が使用しているメインスレッドまでを意味します。あなたは値を取得するために質問します – Aiden
@Imran私が探検できるようなメッセージングミドルウェアを提案してもらえますか?また、これらのアクターのインスタンスを複数インスタンス化するつもりはないと仮定します。 (クラスからオブジェクトへ)。その後、私は 'UploadActor.dataWriteActor?Data(" Hello Akka ")'などと呼ぶことができます。 – Aiden