2016-05-20 4 views
3

Spark 1.6.1にアップグレードした後、updateStateByKeymapWithStateに置き換えるようにアプリケーションのリファクタリングを開始しました。spark mapWithState updated states出力

新しいAPIのパフォーマンス上の利点を利用するために、すべての状態をロードするstateSnapshotsに電話する必要はありません。私は更新された状態だけを必要とします。

mapWithState APIはDStream[key, input, state, output]を返します。各状態は、入力が取り込まれた後に部分的に更新された状態です。このDStream(つまり、対応するすべての入力がインジェスト/マップされた後の状態)から最新の状態だけを抽出するにはどうすればよいですか?

私は、MapWithStateDStreammap(入力と出力をドロップする)とreduceByKeyを行う(私は更新関数の内部で設定)より新しいタイムスタンプを持つ状態を選択することが、私はそこではないという保証を持つことはできません同じタイムスタンプを持つ2つの部分的な状態。たとえカスタム、キー、パーティショナーを使用しています。

出力MapWithStateDStreamの最新の部分状態を確認するには、mapWithStateを使用しますか?

答えて

3

mapWithStateは、現在のマイクロバッチで更新されている各状態に対してのみ呼び出されます。あなたが望むものを達成する1つの方法は、状態が更新された場合にSome[S]を返すことです。

StateSpec.functionは、次のシグネチャを持つメソッドをとります。

mappingFunction: 
    (Time, KeyType, Option[ValueType], State[StateType]) => Option[MappedType] 

私たちは何ができるか、私たちOption[MappedType]がそうでなければNone、値が更新されたとき、常にSome[MappedType]であることを確認しています。

例えば

def updateState(key: Int, value: Option[Int], state: State[Int]): Option[Int] = { 
    value match { 
     case Some(something) if something > 10 => 
     val updatedVal = something * something 
     state.update(updatedVal) 
     Some(updatedVal) 
     case _ => None 
    } 
} 

そして、あなたが行うことができます:

val spec = StateSpec.function(updateState _) 
ssc.mapWithState(spec).filter(!_.isEmpty).foreachRDD(/* do stuff on updated state */) 

あなたはどのなし更新された状態をフィルタリングし、あなたが探しているだけで、更新のスナップショットを維持するこの方法です。

+0

状態は常に更新されます。入力アクションごとに状態が更新されるため、常にSome(updatedState)を返します。これは、出力にすべての部分的な状態が含まれていることを意味します。関連するすべてのアクションが処理された後に出力します。私は、同じ鍵に対応する複数の行動があると述べるべきだったと思います。 'mapWithState'はすべての状態ではなく、すべてのアクションに対して呼び出されます。同じ状態に対して複数回呼び出すことができます。 – Sepph

+0

@Seppehあなたのコードの[MCVE]を投稿できますか?特にSpark DAG。 'State [S] 'を更新する限り、同じキーに対して複数のアクションがあっても、' updateStateByKey'のように値を出力する必要はありません。 –

0

更新アルゴリズムで可能な場合は、mapWithstateを呼び出す前に入力ストリームのreduceByKeyを呼び出すことができます。次に、各キーに対して1つの更新のみがあり、部分的な状態出力はありません。

関連する問題