2016-05-26 15 views
2

私はsessionizationユースケースを持っています。私はmapWithstate()のおかげで自分のセッションをin-memoryにして、それぞれの受信ログに対して更新します。セッションが終了し、特定のログで通知されたら、私はそれを取得し、私のStateから削除したいと思います。状態から削除されたデータを処理する方法

私がつまずく問題は、私は検索と検索がupdateFunction()外で発生し、その中の除去は、つまり一度セッションを取得することはできません削除ので、各batchの終わりに私のセッションを(remove())を削除し、することができないということですセッションが終了した場合、それ以上のログは存在してはならず、それ以上のログはありません。key

私はまだ終了したセッションを取得できますが、「死んだ」セッションの数が増加し、未確認のまま残すとシステム自体を脅かす統合異常(「State -overflow」)が発生します。この解決策は受け入れられません。

一般的な使用例のように、誰かが解決策を思い付いたのだろうかと思っていましたか?


EDIT以下

サンプルコード:

def mapWithStateContainer(iResultParsing: DStream[(String, SessionEvent)]) = { 
    val lStateSpec = StateSpec.function(stateUpdateFunction _).timeout(Seconds(TIMEOUT) 

    val lResultMapWithState: DStream[(String, Session)] = 
     iResultParsing.mapWithState(lStateSpec).stateSnapshots() 

    val lClosedSession: DStream[(String, Session)] = 
     lResultMapWithState.filter(_._2.mTimeout) 

    //ideally remove here lClosedSession from the state 
} 

private def stateUpdateFunction(iKey: String, 
           iValue: Option[SessionEvent], 
           iState: State[Session]): Option[(String, Session)] = { 
    var lResult = None: Option[(String, Session)] 

    if (iState.isTimingOut()) { 
    val lClosedSession = iState.get() 
    lClosedSession.mTimeout = true 

    lResult = Some(iKey, lClosedSession) 
    } else if (iState.exists) { 
     val lUpdatedSession = updateSession(lCurrentSession, iValue) 
     iState.update(lUpdatedSession) 

     lResult = Some(iKey, lUpdatedSession) 

     // we wish to remove the lUpdatedSession from the state once retrieved with lResult 
     /*if (lUpdatedSession.mTimeout) { 
     iState.remove() 
     lResult = None 
     }*/ 
    } else { 
     val lInitialState = initSession(iValue) 
     iState.update(lInitialState) 

     lResult = Some(iKey, lInitialState) 
    } 

    lResult 
} 

private def updateSession(iCurrentSession: Session, 
          iNewData: Option[SessionEvent]): Session = { 
    //user disconnects manually 
    if (iNewData.get.mDisconnection) { 
     iCurrentSession.mTimeout = true 
    } 

    iCurrentSession 
} 
+0

*私が直面している問題は、取得がupdateFunction()の外で行われ、その中での削除が発生するため、各バッチの最後にセッションを取得して削除(削除)できないことです。それが何を意味するのかを示す例は? 「アクセスは更新機能の外にある」とはどういう意味ですか? –

+0

コードが編集されました。それは今はっきりしていますか? – wipman

+0

間違いなく。下の私の答えを見てください。 –

答えて

0

代わりMapWithStateRDD.stateSnapshotを呼ぶのを、あなたはあなたのmapWithState操作の戻り値として更新された状態を返すことができます。このように、ファイナライズされた状態は、常にステートフルなDStreamの外部で使用できます。

else if (iState.exists) { 
    val lUpdatedSession = updateSession(lCurrentSession, iValue) 
    iState.update(lUpdatedSession) 

    if (lUpdatedSession.mTimeout) { 
    iState.remove() 
    } 

    Some(iKey, lUpdatedSession) 
} 

を今にあなたのグラフを変更します:

状態は内部を を削除されているということになりましたが、何が起こる
val lResultMapWithState = iResultParsing 
          .mapWithState(lStateSpec) 
          .filter { case (_, session) => session.mTimeout } 

ますので、」

この

はあなたが行うことができることを意味しあなたのStateSpec機能から返品します。それ以上の処理のためにあなたの外に用意されています。

関連する問題