2017-10-16 19 views
0

私は、ユーザーデータの負荷が大きいです。私はそれが新しいユーザーかどうかをIDで判断したいと思います。 dbへの呼び出しを減らすために、私は以前のユーザーの状態を維持します。ストリーム内の状態を維持する

val users = mutable.set[String]() 
//init the state from db 
user = db.getAllUsersIds() 
val source: Source[User, NotUsed] 
val dbSink: Sink[User, NotUsed] //goes to db 
//if the user is added to the set it will return true 
val usersFilter = Flow[User].filter(user => users.add(user.id)) 

今私は私の問題は、可変状態が共有し、安全ではないということです

source ~> usersFilter ~> dbSink 

グラフを作成することができます。フロー内で状態を維持するオプションはありますか?

答えて

0

これには2通りの方法があります。

レコードのストリームを取得していて、ストリームを重複排除したい場合(一部のIDはすでに処理されているため)。あなたはこれを行うための他の方法は、IDが既に存在するかどうかをチェックデータベース検索経由で

http://janschulte.com/2016/03/08/deduplicate-akka-stream/

を行うことができます。

val alreadyExists : Flow[User, NotUsed] = { 
    // build a cache of known ids 
    val knownIdList = ... // query database and get list of IDs 
    Flow[User].filterNot(user => knownIdList.contains(user.id)) 
} 
+0

あなたの提案に重大な欠陥があります。 dbに多くのioが必要な要求が多々必要です。私はむしろそれを記憶に残しておきます(私は自分の投稿を更新します) – igx

+0

少なくとも私の場合は、DBへの呼び出しを1回だけ行い、単一のクエリの既存のIDをすべてリストにロードします。実行時にフロー内のリスト(またはマップ)を参照するだけです。 –

+0

あなたはこれのような何かがうまくいくと思いますか? 例えば 'DEF alreadyExists = { ヴァルalreadyExistingIds = mutable.set [文字列](DBからINIT) フロー[ユーザー] .filterNot(ユーザー=> alreadyExistingIds.add(user.id)) – igx

関連する問題