私はsessionization
ユースケースを持っています。私はmapWithstate()
のおかげで自分のセッションをin-memory
にして、それぞれの受信ログに対して更新します。セッションが終了し、特定のログで通知されたら、私はそれを取得し、私のState
から削除したいと思います。状態から削除されたデータを処理する方法
私がつまずく問題は、私は検索と検索がupdateFunction()
外で発生し、その中の除去は、つまり一度セッションを取得することはできません削除ので、各batch
の終わりに私のセッションを(remove()
)を削除し、することができないということですセッションが終了した場合、それ以上のログは存在してはならず、それ以上のログはありません。key
私はまだ終了したセッションを取得できますが、「死んだ」セッションの数が増加し、未確認のまま残すとシステム自体を脅かす統合異常(「State
-overflow」)が発生します。この解決策は受け入れられません。
一般的な使用例のように、誰かが解決策を思い付いたのだろうかと思っていましたか?
EDIT以下
サンプルコード:
def mapWithStateContainer(iResultParsing: DStream[(String, SessionEvent)]) = {
val lStateSpec = StateSpec.function(stateUpdateFunction _).timeout(Seconds(TIMEOUT)
val lResultMapWithState: DStream[(String, Session)] =
iResultParsing.mapWithState(lStateSpec).stateSnapshots()
val lClosedSession: DStream[(String, Session)] =
lResultMapWithState.filter(_._2.mTimeout)
//ideally remove here lClosedSession from the state
}
private def stateUpdateFunction(iKey: String,
iValue: Option[SessionEvent],
iState: State[Session]): Option[(String, Session)] = {
var lResult = None: Option[(String, Session)]
if (iState.isTimingOut()) {
val lClosedSession = iState.get()
lClosedSession.mTimeout = true
lResult = Some(iKey, lClosedSession)
} else if (iState.exists) {
val lUpdatedSession = updateSession(lCurrentSession, iValue)
iState.update(lUpdatedSession)
lResult = Some(iKey, lUpdatedSession)
// we wish to remove the lUpdatedSession from the state once retrieved with lResult
/*if (lUpdatedSession.mTimeout) {
iState.remove()
lResult = None
}*/
} else {
val lInitialState = initSession(iValue)
iState.update(lInitialState)
lResult = Some(iKey, lInitialState)
}
lResult
}
private def updateSession(iCurrentSession: Session,
iNewData: Option[SessionEvent]): Session = {
//user disconnects manually
if (iNewData.get.mDisconnection) {
iCurrentSession.mTimeout = true
}
iCurrentSession
}
*私が直面している問題は、取得がupdateFunction()の外で行われ、その中での削除が発生するため、各バッチの最後にセッションを取得して削除(削除)できないことです。それが何を意味するのかを示す例は? 「アクセスは更新機能の外にある」とはどういう意味ですか? –
コードが編集されました。それは今はっきりしていますか? – wipman
間違いなく。下の私の答えを見てください。 –