0
私は、ユーザーデータの負荷が大きいです。私はそれが新しいユーザーかどうかをIDで判断したいと思います。 dbへの呼び出しを減らすために、私は以前のユーザーの状態を維持します。ストリーム内の状態を維持する
val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink: Sink[User, NotUsed] //goes to db
//if the user is added to the set it will return true
val usersFilter = Flow[User].filter(user => users.add(user.id))
今私は私の問題は、可変状態が共有し、安全ではないということです
source ~> usersFilter ~> dbSink
グラフを作成することができます。フロー内で状態を維持するオプションはありますか?
あなたの提案に重大な欠陥があります。 dbに多くのioが必要な要求が多々必要です。私はむしろそれを記憶に残しておきます(私は自分の投稿を更新します) – igx
少なくとも私の場合は、DBへの呼び出しを1回だけ行い、単一のクエリの既存のIDをすべてリストにロードします。実行時にフロー内のリスト(またはマップ)を参照するだけです。 –
あなたはこれのような何かがうまくいくと思いますか? 例えば 'DEF alreadyExists = { ヴァルalreadyExistingIds = mutable.set [文字列](DBからINIT) フロー[ユーザー] .filterNot(ユーザー=> alreadyExistingIds.add(user.id)) – igx