2013-01-02 8 views
6

参照されていない俳優は、イベントストリームに引き続き加入していると言えるでしょうか?少なくとも、それは私がAkkaを試して得たものです。弱参照のEventbusアクターを実装しますか?

私は、EventBusシナリオでアクターの弱参照を実装しようとしています。そのような場合、イベントリスナー/アクターは通常、出入りします。常に存在するはずの独立した俳優とは異なります。もちろん明示的な登録解除は機能します。しかし、私はいつもこれを行うための正しい瞬間を知ることができません。

このような場合、Akkaは提供していますか?

val as = ActorSystem.create("weak") 
var actor = as.actorOf(Props[ExceptionHandler]) 
as.eventStream.subscribe(actor,classOf[Exception]) 

// an event is published & received 
as.eventStream.publish(new KnownProblem) 

//session expires or whatever that makes the actor redundant 
actor = null 
(1 to 30).foreach(_ => System.gc) 

// an event is published & STILL received 
as.eventStream.publish(new KnownProblem) 
+0

' unsubscribe'俳優? – idonnie

+0

それは動作します。もし私が俳優がもう何の言及もされていないことが分かっていれば、 :-)私はhttpセッションでこのシナリオを試しています。アプリケーションサーバーがセッションを終了すると、「退会」を実行する機会はありません。通常、これは弱い参照で行われます。 –

+0

EventBusは 'def eventStream'で' ActorSystem'にアタッチされます。また、設定中に 'eventStream'にいくつかのイベントが公開されます。 私は 'WeakReference [ActorRef]' -sのサブスクリプションで 'EventBus'を拡張することを提案します、特性' LookupClassification'は有望です。著者からの 引用: 'EventBusとimplement.' https://groups.google.com/forum/?fromgroups=#!topic/akka-user/T3-FONxoX8E アッカEventBus簡単な例を拡張し ます。https: //gist.github.com/3163791 – idonnie

答えて

0

これは実際に実装できませんでしたが、俳優はGCで停止しています。 Scala 2.9.2(REPL)+ Akka 2.0.3の使用。

WeakReference[ActorRef]EventBusが助けにはならなかった - アッカであなたもChildrenContainerdungeonself.children)を持っているので、またライフサイクルイベントにMonitorサブスクリプションがあるかもしれません。私が試したことはありません - 私たちの新しい輝きについて知っているディスパッチャーを持つ俳優を作成するWeakEventBus - 多分私はポイントを逃した?ここ

は、REPLのコードは(適切な輸入品でそれを開始し、:pasteそれ2の手順)行く: `classOf [例外]`分類器を使用して `EventStream`から

// Start REPL with something like: 
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar: 
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar: 
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar" 

// :paste 1/2 
import akka.actor._ 
import akka.pattern._ 
import akka.event._ 
import akka.util._ 
import com.typesafe.config.ConfigFactory 
import akka.util.Timeout 
import akka.dispatch.Await 
import scala.ref.WeakReference 
import java.util.Comparator 
import java.util.concurrent.atomic._ 
import java.util.UUID 

case class Message(val id:String,val timestamp: Long) 
case class PostMessage(
    override val id:String=UUID.randomUUID().toString(), 
    override val timestamp: Long=new java.util.Date().getTime(), 
    text:String) extends Message(id, timestamp) 
case class MessageEvent(val channel:String, val message:Message) 

case class StartServer(nodeName: String) 
case class ServerStarted(nodeName: String, actor: ActorRef) 
case class IsAlive(nodeName: String) 
case class IsAliveWeak(nodeName: String) 
case class AmAlive(nodeName: String, actor: ActorRef) 
case class GcCheck() 
case class GcCheckScheduled(isScheduled: Boolean, 
    gcFlag: WeakReference[AnyRef]) 

trait WeakLookupClassification { this: WeakEventBus ⇒ 
protected final val subscribers = new Index[Classifier, 
    WeakReference[Subscriber]](mapSize(), 
    new Comparator[WeakReference[Subscriber]] { 
      def compare(a: WeakReference[Subscriber], 
     b: WeakReference[Subscriber]): Int = { 
       if (a.get == None || b.get == None) -1 
       else compareSubscribers(a.get.get, b.get.get) 
     } 
     }) 
protected def mapSize(): Int 
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int 
protected def classify(event: Event): Classifier 
protected def publish(event: Event, subscriber: Subscriber): Unit 
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = 
    subscribers.put(to, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = 
    subscribers.remove(from, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber): Unit = 
    subscribers.removeValue(new WeakReference(subscriber)) 
def publish(event: Event): Unit = { 
     val i = subscribers.valueIterator(classify(event)) 
     while (i.hasNext) publish(event, i.next().get.get) 
} 
    } 

class WeakEventBus extends EventBus with WeakLookupClassification { 
    type Event = MessageEvent 
    type Classifier=String 
    type Subscriber = ActorRef 

    protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b 

    protected def mapSize(): Int = 10 
    protected def classify(event: Event): Classifier = event.channel 
    protected def publish(event: Event, subscriber: Subscriber): Unit = 
     subscriber ! event 
} 

lazy val weakEventBus = new WeakEventBus 

implicit val timeout = akka.util.Timeout(1000) 
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" 
akka { 
    loglevel = "DEBUG" 
    actor { 
     provider = "akka.remote.RemoteActorRefProvider" 
     debug { 
      receive = on 
      autoreceive = on   
      lifecycle = on 
      event-stream = on 
     } 
    } 
    remote { 
     transport = "akka.remote.netty.NettyRemoteTransport" 
     log-sent-messages = on 
     log-received-messages = on  
    } 
} 
serverconf { 
    include "common" 
    akka { 
     actor { 
      deployment { 
     /root { 
      remote = "akka://[email protected]:2552" 
     }  
      } 
     } 
     remote { 
      netty { 
     hostname = "127.0.0.1" 
     port = 2552 
      } 
     } 
    } 
} 
""").getConfig("serverconf")) 

class Server extends Actor { 
    private[this] val scheduled = new AtomicBoolean(false) 
    private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]() 

    val gcCheckPeriod = Duration(5000, "millis") 

    override def preRestart(reason: Throwable, message: Option[Any]) { 
     self ! GcCheckScheduled(scheduled.get, gcFlagRef.get) 
     super.preRestart(reason, message) 
    } 

    def schedule(period: Duration, who: ActorRef) = 
     actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) 

    def receive = {  
     case StartServer(nodeName) => 
      sender ! ServerStarted(nodeName, self) 
      if (scheduled.compareAndSet(false, true)) 
     schedule(gcCheckPeriod, self) 
      val gcFlagObj = new AnyRef()    
      gcFlagRef.set(new WeakReference(gcFlagObj)) 
      weakEventBus.subscribe(self, nodeName) 
      actorSystem.eventStream.unsubscribe(self)  
     case GcCheck => 
      val gcFlag = gcFlagRef.get 
      if (gcFlag == null) { 
     sys.error("gcFlag") 
      } 
     gcFlag.get match { 
     case Some(gcFlagObj) => 
      scheduled.set(true) 
      schedule(gcCheckPeriod, self) 
     case None => 
      println("Actor stopped because of GC: " + self) 
      context.stop(self)   
     } 
     case GcCheckScheduled(isScheduled, gcFlag) => 
      if (isScheduled && scheduled.compareAndSet(false, isScheduled)) { 
     gcFlagRef.compareAndSet(null, gcFlag) 
     schedule(gcCheckPeriod, self)    
      } 
     case IsAlive(nodeName) => 
      println("Im alive (default EventBus): " + nodeName) 
      sender ! AmAlive(nodeName, self) 
     case e: MessageEvent => 
      println("Im alive (weak EventBus): " + e)  
    } 
} 

// :paste 2/2 
class Root extends Actor { 
    def receive = { 
     case start @ StartServer(nodeName) => 
     val server = context.actorOf(Props[Server], nodeName) 
     context.watch(server) 
     Await.result(server ? start, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
     case started @ ServerStarted(nodeName, _) => 
      sender ! started 
     case _ => 
      throw new RuntimeException(
      "[S][FAIL] Could not start server: " + start) 
     } 
     case isAlive @ IsAlive(nodeName) => 
     Await.result(context.actorFor(nodeName) ? isAlive, 
     timeout.duration).asInstanceOf[AmAlive] match { 
     case AmAlive(nodeName, _) => 
      println("[S][SUCC] Server is alive : " + nodeName) 
     case _ => 
     throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)  
      } 
     case isAliveWeak @ IsAliveWeak(nodeName) =>     
     actorSystem.eventStream.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-default"))) 
     weakEventBus.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-weak"))) 
} 
    } 

lazy val rootActor = actorSystem.actorOf(Props[Root], "root") 

object Root { 
    def start(nodeName: String) = { 
     val msg = StartServer(nodeName) 
     var startedActor: Option[ActorRef] = None 
     Await.result(rootActor ? msg, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
      case succ @ ServerStarted(nodeName, actor) => 
      println("[S][SUCC] Server started: " + succ) 
      startedActor = Some(actor) 
      case _ => 
     throw new RuntimeException("[S][FAIL] Could not start server: " + msg) 
      } 
     startedActor 
    } 
    def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName) 
    def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName) 
} 

//////////////// 
// actual test 
Root.start("weak") 
Thread.sleep(7000L) 
System.gc() 
Root.isAlive("weak") 
関連する問題