Akkaクラスタアプリケーションのコンテキストでは、Akkaが期待する1つのプロパティに関する問題が発生しました。すべての(cas)クラスと使用されるすべてのメッセージは直列化可能でなければなりません。私は以下のコンテキストを持っています:Redisクラスタからデータを消費したいので、クラスタ対応のルータプールを採用して、ノードを追加してより多くの作業者を確保することにしました。作業者はredisからデータを読み取り、mongodbにいくつかのメタデータを保存します。Akkaクラスタ対応ルータ - すべてのルートにredisインスタンスを共有
object MasterWorkers {
def props
( awsBucket : String,
gapMinValueMicroSec : Long,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) : Props =
Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName))
case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp)
}
case class MasterWorkers
(
awsBucket : String,
gapMinValueMicroSec : Long,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) extends Actor with ActorLogging {
val workerRouter =
context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)),
name = "workerRouter")
ワーカークラス:
object Worker {
def props
(
awsBucket : String,
gapMinValueMicroSec : Long,
replyTo : ActorRef,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) : Props =
Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName))
case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp)
case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
}
case class Worker
(
awsBucket : String,
gapMinValueMicroSec : Long,
replyTo : ActorRef,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) extends Actor with ActorLogging {
しかし、私は二つのノード始めたとき、これは、以下の例外が発生します:Redisのキャッシュは簡単です
[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894)
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786)
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497)
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl
を最初のバージョンでは、私はこれをしませんでした次のようなインタフェースを実装したコンパニオンオブジェクトのケースクラスです。
その後、私は労働者でredisCache
を移動し、問題を解決するために、私はマスターノードにそれを与えていないよ:
case class Worker
(
awsBucket : String,
gapMinValueMicroSec : Long,
replyTo : ActorRef,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) extends Actor with ActorLogging {
// redis cache here now
val redisCache = ...
しかし、そのようなデザインで、あらゆるrouteeはRedisの新しいインスタンスを作成します。キャッシュと予想される動作ではありません。私が望むのは、私のredisキャッシュのインスタンスを1つ作成してそれをすべてのルートと共有することですが、クラスタアプリケーションのコンテキストでは不可能なようですので、設計上の失敗か経験不足かはわかりませんAkkaと一緒に。誰かが同様の問題に直面した場合、私は喜んでアドバイスを受け取ります!
をその周り
ActorRequest
を構築することができ、それは各ワーカーはRedisのクライアントをインスタンス化することを意味しますか? – alifiratはい。私はちょうどそれがあなたが興味を持っている場合は、1つの接続を共有することを含むもう1つのアプローチを追加しました。 –
はい、私は代わりに考えていませんでした。私はそれを試してあなたに知らせるでしょう! – alifirat