2016-08-03 4 views
0

でバージョン管理、私は次のユースケースを持っていることを確認します:は、アッカ・メッセージがユニーク/いくつかの属性

俳優がkey:valueペアで構成され、いくつかの外部ソースからのデータを消費します。その後、彼らはkeyと最新のvalueでいくつかのアクションを実行する必要がある俳優に渡され、古いものをスキップします。基本的に私はメールボックスを1の容量の "サブトピック"に分割しようとしています。 Akkaでそのようなことは可能ですか?そうでない場合、私はどのような選択肢を検討すべきですか?

+0

なぜ、最新の値をプレフィルタリングして渡すのはなぜですか?おそらく、あなたのユースケースを理解するのに役立つ例があります。私はサブトピックの理由を見ませんか? – sascha10000

+0

アプリケーションはHFTドメインにあり、入力は市場テロップの流れです。最新のテロップを処理した後は、古い市場データを処理することはできません。したがって、「トピック」または「キー」はティッカーシンボルになり、値は価格になります。 – sdfdfjndfsd

+0

それでは、akka-streamsを介してストリームをあらかじめ処理しておき、key:valueのペアの処理が等しい場合にのみBalancingPoolに "実際の" tickinfosを送ります。そうでない場合は、複数のActorが実装されている場合は、最初にストリームをフィルタリングし、処理するアクタを決定できます。基本的には、RootActorがフィルタリングを処理したActorシステムを構築して、それを子プロセスに送信します。 "1つの"ものとそれは非常に良いことを行うべきです。 – sascha10000

答えて

0

私はあなたが必要と思うのは、より新しいメッセージが優先される優先メールボックスです。 PriorityMailboxのデフォルトの実装を見てください。

それは(ドキュメントの例に基づいて)このようなものになります。

ここ versionToInt
import akka.dispatch.PriorityGenerator 
import akka.dispatch.UnboundedStablePriorityMailbox 
import com.typesafe.config.Config 

type Version = // Long, Date, Timestamp, smth else - must be ordered 
case class MyMessage(key: String, value: String, version: Version) 

class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) 
    extends UnboundedStablePriorityMailbox(
    // Create a new PriorityGenerator, lower prio means more important 
    PriorityGenerator { 
     case MyMessage(_, _, version) => versionToInt(version) 

     // PoisonPill when no other left 
     case PoisonPill => 1 // or Int.MaxValue - 2 

     // We default to 1, which is in between high and low 
     case otherwise  => 0 // or Int.MaxValue - 1 
    }) 

メッセージ(優先順位が高い)の新しいバージョンの低い一部のInt値を返す必要がありますが、2 - Infで言います範囲。

その後、あなたの俳優で処理された最高のバージョンを追跡し、そのバージョンより古い他のすべてのメッセージをドロップすることができます。あなたの俳優にはbecomeまたはvarを使用してください。

送信されたメッセージの順序はAkkaによって保証されますが、最終的に送信者と送信者の間で遅延が発生し、処理されるメッセージに影響します。

+0

私はこのアプローチを考えましたが、それは遅くないでしょうか?私はすべてのアクターインスタンス間で共有変数が必要になり、同期/アトミック値を使用するようになります。私はAkkaには新しく、すべてを完全に理解していない。また、私はいくつかのセット(質問へのコメントのより多くの説明)の間でのみ、メッセージが「ユニーク」であることが必要であり、ストリーム全体では必要としません。私はバージョンの共有マップを持つことでこのアプローチを変更しようとすることができますが、それはa)すべてをダウンさせるb)古いキーを追い出すためのメカニズムが必要です。 – sdfdfjndfsd

+0

共有変数はありません。キーごとにアクターを作成する必要があります。無視するメッセージは安いでしょう。最新のメッセージのバージョンを表す変更可能な状態は、アクター内で安全にカプセル化されます。 JVMあたり何百万人もの俳優のためにはうまくいくはずです –

+0

しかし、将来のメッセージが同じデスティネーションアクターにルーティングされるようにするにはどうすればよいですか?カスタムルータを作成することによって? – sdfdfjndfsd

関連する問題