1

背圧機能を備えたアクターベースのシステムを実装しようとしています。要件として、マスタープロセスはJSON形式のストリーミングデータを受け取ります。ただし、各JSONイベントには、{ip: '123.43.12.1'、国: 'US'、...など}のようなフィールドがあります。 JSONの構造はあらかじめわかっています。ファンアウト機能を備えたリアクティブストリームアクターシステムの設計方法

ここで、JSON構造を何らかの形で(キー、値)のペアに平坦化する必要があります。たとえば、上記のデータは(ip、freq)、(country、freq)に展開することができます。ここで、freqはIPストリーム(123.43.12.1など)がデータストリームに現れる時間の数です。

非常に自然なやり方は、さらに評価するために、各(キー、値)のペアを対応する子/リモートアクタに転送することです。たとえば、( '123.43.12.1'、1)がIP-Actorに送信されます。 ( 'US'、1)はCountry-Actorなどに送信されます。

システム全体が背圧になっていることを確認したいと思います。 IP-ActorとCountry-Actorの両方が平坦化されたペア( '123.43.12.1'、 '123.43.12.1')を処理した場合、イベント{ip: '123.43.12.1'、国: 'US'}は処理されたものとみなされるので、 12.1 '、1)、(' US '、1)。各アクタの処理速度が異なる場合があります(たとえば、IP-ActorはCountry-Actorよりもはるかに高速です)。その場合、ストリームを受け取ったマスタープロセスが、要求信号があるまで待機/ブロックします(両方のアクターがメールボックス内の既存データの処理を完了したときに発生します)。そうしないと、あるアクタはメールボックスにメッセージがいっぱいになるかもしれませんが(Country-Actor - slow one)、他のアクタメールボックスが空であるため(IP-Actor - 速いもの)、メッセージは引き続き入ります。

react-stream仕様でこのような機能が提供されている場合は、誰でもお勧めしますか?そうでなければ、最も効率的な方法で機能を達成するためにとにかくあります。

ありがとうございました。

答えて

0

あなたが記述したアクター間の同期のタイプは、あなたがアクターモデルで避けたいものです。どんな「待機/ブロック」も、反応プログラミングと反応プログラミングです。更新を行うには、単一のストリームFlowを使用することをお勧めします。

あなたはJSONデータを処理するための最初の必要性:

import akka.stream.scaladsl._ 

//your original source of json strings 
val jsonSrc : Source[String, NotUsed] = ??? 

case class JsonObject(ip : String, country : String) 

//use your favorite json parser 
def jsonParser(jsonStr : String) : JsonObject = ??? 

val parserFlow = Flow[String] map jsonParser 

次は、カウンタロジックを定義し、インクリメント値でカウンタを生成するFlow.scanを使用します。

最後に
type IPCounter = Map[String,Int] 
val emptyIPCounter = Map.empty[String,Int] withDefaultValue 0 

type CountryCounter = Map[String, Int] 
val emptyCountryCounter = Map.empty[String,Int] withDefaultValue 0 

type Counters = Tuple2[IPCounter, CountryCounter] 
val emptyCounters = (emptyIPCounter, emptyCountryCounter) 

def updateCounters(counters : Counters, jsonObj : JsonObject) : Counters = { 
    (counters._1.updated(jsonObj.ip, counters._1(jsonObj.ip) + 1), 
    counters._2.updated(jsonObj.country, counters._2(jsonObj.country) + 1)) 
} 

val counterFlow = Flow[JsonObject].scan(emptyCounters)(updateCounters) 

、一緒にすべてを兼ね備え:

val counterSource : Source[Counters, NotUsed] = jsonSrc via parserFlow via counterFlow 

結果はまさにあなたが求めているものです:すべてのカウンタが更新されたときにカウンタ値だけを転送するバックプレッシャストリーム。

関連する問題