私はあなたが必要と思うのは、より新しいメッセージが優先される優先メールボックスです。 PriorityMailboxのデフォルトの実装を見てください。
それは(ドキュメントの例に基づいて)このようなものになります。
ここ
versionToInt
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config
type Version = // Long, Date, Timestamp, smth else - must be ordered
case class MyMessage(key: String, value: String, version: Version)
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedStablePriorityMailbox(
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator {
case MyMessage(_, _, version) => versionToInt(version)
// PoisonPill when no other left
case PoisonPill => 1 // or Int.MaxValue - 2
// We default to 1, which is in between high and low
case otherwise => 0 // or Int.MaxValue - 1
})
メッセージ(優先順位が高い)の新しいバージョンの低い一部のInt
値を返す必要がありますが、2 - Inf
で言います範囲。
その後、あなたの俳優で処理された最高のバージョンを追跡し、そのバージョンより古い他のすべてのメッセージをドロップすることができます。あなたの俳優にはbecome
またはvar
を使用してください。
送信されたメッセージの順序はAkkaによって保証されますが、最終的に送信者と送信者の間で遅延が発生し、処理されるメッセージに影響します。
なぜ、最新の値をプレフィルタリングして渡すのはなぜですか?おそらく、あなたのユースケースを理解するのに役立つ例があります。私はサブトピックの理由を見ませんか? – sascha10000
アプリケーションはHFTドメインにあり、入力は市場テロップの流れです。最新のテロップを処理した後は、古い市場データを処理することはできません。したがって、「トピック」または「キー」はティッカーシンボルになり、値は価格になります。 – sdfdfjndfsd
それでは、akka-streamsを介してストリームをあらかじめ処理しておき、key:valueのペアの処理が等しい場合にのみBalancingPoolに "実際の" tickinfosを送ります。そうでない場合は、複数のActorが実装されている場合は、最初にストリームをフィルタリングし、処理するアクタを決定できます。基本的には、RootActorがフィルタリングを処理したActorシステムを構築して、それを子プロセスに送信します。 "1つの"ものとそれは非常に良いことを行うべきです。 – sascha10000