2016-04-08 11 views
3

2つのDStream間でスパークストリーミング状態を共有できますか?2つのストリーム間のSparkストリーミング共有状態

基本的には、最初のストリームを使用して状態を作成/更新し、状態を使用して2番目のストリームを強化したいとします。

例:StatefulNetworkWordCountの例を変更しました。私は最初のストリームを使って状態を作り、最初のストリームのカウントで2番目のストリームを豊かにしています。

val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) 


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { 
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0) 
    val output = (word, sum) 
    state.update(sum) 

    Some(output) 
} 

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { 
    val sum = state.getOption.getOrElse(0) 
    val output = (word, sum) 

    Some(output) 
} 



// first stream 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
    .flatMap(r=>r._2.split(" ")) 
    .map(x => (x, 1)) 
    .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10))) 
    .print(1) 



// second stream 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet) 
    .flatMap(r=>r._2.split(" ")) 
    .map(x => (x, 1)) 
    .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10))) 
    .print(50) 

チェックポイントディレクトリには、2つの異なる状態RDDがあります。

私はこの方法では、あなたのための役に立つかもしれ火花1.6.1を使用し、カフカ-0.8.2.1

答えて

-1

ています:の基礎となるStateDStreamにアクセスすることが可能です

ssc.untion(Seq[Dstream[T]]) 
+0

ありがとう@bug_xshguo、これは良い考えだとは思わない。これらのストリームに対して異なる変換を実行したいと思います。 – banjara

1

を使用してmapWithState操作を適用した結果のストリーム例:

val firstDStream = ??? 
val secondDStream = ??? 
val firstDStreamSMapped = firstDStream..mapWithState(...) 
val firstStreamState = firstDStreamSMapped.snapshotStream() 
// we want to use the state of Stream 1 to enrich Stream 2. The keys of both streams are required to match. 
val enrichedStream = secondDStream.join(firstStreamState) 
... do stuff with enrichedStream ... 
+0

これはお返事ありがとうございます、これは面白いアプローチですが、私の場合は状態が巨大になり、私はすべてのマイクロバッチに対して状態全体を反復したくありません。 – banjara

+0

@shekharので、あるDStreamからStateDStreamを直接ハックして2ndにする必要がありますか?それができるかどうかを調べるには、ソースに潜入する必要があると思います。 – maasg

関連する問題