2013-04-28 10 views
14

akkaでイベントバスを使用する方法に関する良いチュートリアル/説明はありますか? Akkaのドキュメントを読んだことがありますが、イベントバスの使い方を理解するのが難しいと感じていますAkkaイベントバスチュートリアル

+0

http://www.kotancode.com/2014/02/12/using-the-akka-event-bus/ – AnonGeek

+1

建設的ではありませんか? 他に、このような質問に対する回答を見つけることができますか? –

答えて

39

良いチュートリアルがあるかどうかわかりませんが、イベントストリームを使用すると役に立つ可能性があります。しかし、高いレベルでは、イベントストリームは、アプリが持つpub/subタイプの要件を満たすための優れたメカニズムです。たとえば、システムのユーザーの残高を更新するユースケースがあるとします。残高には頻繁にアクセスするため、パフォーマンスを向上させるために残高をキャッシュすることに決めました。天びんが更新されると、ユーザーが天びんでスレッショルドを越えているかどうかを確認して確認し、そうであれば電子メールで通知します。重量が重くてユーザーの応答が遅くなるため、キャッシュのドロップまたはバランスのしきい値のチェックのいずれかをメインの残高更新コールに直接関連付ける必要はありません。この例では、AccountCacherLowBalanceChecker役者を

//Message and event classes 
case class UpdateAccountBalance(userId:Long, amount:Long) 
case class BalanceUpdated(userId:Long) 

//Actor that performs account updates 
class AccountManager extends Actor{ 
    val dao = new AccountManagerDao 

    def receive = { 
    case UpdateAccountBalance(userId, amount) => 
     val res = for(result <- dao.updateBalance(userId, amount)) yield{ 
     context.system.eventStream.publish(BalanceUpdated(userId)) 
     result     
     } 

     sender ! res 
    } 
} 

//Actor that manages a cache of account balance data 
class AccountCacher extends Actor{ 
    val cache = new AccountCache 

    override def preStart = { 
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated]) 
    } 

    def receive = { 
    case BalanceUpdated(userId) => 
     cache.remove(userId) 
    } 
} 

//Actor that checks balance after an update to warn of low balance 
class LowBalanceChecker extends Actor{ 
    val dao = new LowBalanceDao 

    override def preStart = { 
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated]) 
    } 

    def receive = { 
    case BalanceUpdated(userId) => 
     for{ 
     balance <- dao.getBalance(userId) 
     theshold <- dao.getBalanceThreshold(userId) 
     if (balance < threshold) 
     }{ 
     sendBalanceEmail(userId, balance) 
     } 
    } 
} 

を両方BalanceUpdatedイベントのクラス型でeventStreamにサブスクライブ:あなたはそうのような要件の特定のセットをモデル化することができます。このイベントがストリームに公開されたイベントの場合、これらのアクターインスタンスの両方で受信されます。次に、AccountManagerでは、残高更新が成功すると、ユーザーのためにBalanceUpdatedイベントが発生します。これが並行して行われると、そのメッセージはAccountCacherLowBalanceCheckerの両方のメールボックスに配信され、キャッシュとのバランスが落ち、アカウントのしきい値がチェックされ、電子メールが送信される可能性があります。

さて、あなただけのこれらの他の2人の俳優と直接通信するAccountManagerに直接tell (!)電話を入れている可能性がありますが、一つはそれがあまりにも密接残高更新のこれら二つの「副作用」をカップリングされる可能性があります主張することができ、それらのこと詳細の種類は必ずしもAccountManagerに属するとは限りません。純粋に副作用(コア・ビジネス・フロー自体の一部ではない)として発生する必要がある追加のもの(検査、更新など)が発生する可能性がある条件がある場合、イベント・ストリームは良い方法かもしれません募集されているイベントとそのイベントに反応する必要のある人物を切り離すことができます。

+0

ありがとうございます。ほんの少しの質問:1)イベントバスを購読して作成するだけですか?イベントバスを「破壊する」方法は? 2)イベントバスを担当する特定の俳優はいますか? 3)私はあなたが分類器を宣言していないことに気付きました、そこに選択されているデフォルトの分類器がありますか? – Tsume

+1

'ActorSystem'にはすでにイベントバスが作成されています。自分で作成する必要はありません。 'ActorSystem'がバスを作るので、私はルートガーディアンがバスを担当していると思います。質問3の意味がわからない。もう少し説明できますか? – cmbaxter

+1

私は[doc](http://doc.akka.io/docs/akka/2.1.2/scala/event-bus.html)をもう一度読んで、私自身の質問に誤解しているようです。私が言いたいのは、上記の例のアクターは、特定のメッセージ(BalanceUpdated)を受け取るために購読されているということです。さまざまなメッセージを送信できるトピックに俳優を登録するにはどうすればいいですか? – Tsume

10

ActorSystemごとに存在するEventBusがあります。このEventBusEvent Streamと呼ばれ、system.eventStreamを呼び出して取得できます。

ActorSystemはloggingDead LettersおよびCluster Eventsを含む多くのものに対してイベントストリームを使用します。

独自のパブリッシュ/サブスクライブ要件にイベントストリームを使用することもできます。たとえば、イベントストリームはテスト中に役立ちます。特定のイベント(ログイベントなど)については、Test KittestActorをイベントストリームに登録してください。expectとすることができます。これは何かが起こったときに別のアクタにメッセージを送信しない場合でもテストでイベントを期待する必要がある場合に特に便利です。

イベントストリームは1つのActorSystem内でのみ動作することに注意してください。ストリームに公開されているリモーティングイベントを使用している場合は、デフォルトではリモートシステムには行きません(自分でサポートを追加することもできます)。

イベントストリームを使用しない場合は、理論的には別のEventBusを作成することができます。

イベントバス用のより良いドキュメントは、Akka 2.2のために働いていますので、this ticketが完了したら再びチェックしてください。