2016-08-15 8 views
2

Spark DStreamの最後のN個のメッセージを蓄積する最適なソリューションを探しています。私はまた、保持するメッセージの数を指定したいと思います。Spark Streaming DStreamを蓄積する最適なソリューション

  1. updateStateByKey:

    Iteration New message Downstream 
    1   A   [A] 
    2   B   [A, B] 
    3   C   [A, B, C] 
    4   D   [B, C, D] 
    

    は、これまでのところ、私はDSTREAMに、以下の方法で探しています:

    たとえば、以下の流れを考えると、私は最後の3つの要素を保持したいのですが:すべてのメッセージが同じ鍵を持っているので、私はこれを行うことができます。しかし、これが鍵について何か知っている必要があるのか​​、ちょっと奇妙に見えます。

  2. mapWithState:ScalaではAPIは、このような単純なこと
  3. ウィンドウのためあまりにも面倒です。また、それは要素
  4. の最後の番号をウインドウの代わりのための時間値を必要とする、この仕事をしていないようですアキュムレータ:まだ実際には使用されていませんAccumulators in Spark docs

これを達成する最適なソリューションは何ですか?

答えて

1

mapWithStateはあなたが必要とする正確に何であり、それは間違いなく、あまりにも退屈ではありません。

case class Message(x: String) 
def statefulTransformation(key: Int, 
          value: Option[Message], 
          state: State[mutable.MutableList[Message]]): Option[Message] = { 
    def updateState(value: Message): Message = { 
    val updatedList = 
     state 
     .getOption() 
     .map(list => if (list.size > 3) list.drop(1) :+ value else list :+ value) 
     .getOrElse(mutable.MutableList(value)) 

    state.update(updatedList) 
    value 
    } 

    value.map(updateState) 
} 

そして今、あなたが必要なのは次のとおりです。

val stateSpec = StateSpec.function(statefulTransformation _) 
dStream.mapWithState(stateSpec) 

サイドノート - 私は、一定時間アペンド用mutable.MutableListを使用しました。

関連する問題