2つのDStream間でスパークストリーミング状態を共有できますか?2つのストリーム間のSparkストリーミング共有状態
基本的には、最初のストリームを使用して状態を作成/更新し、状態を使用して2番目のストリームを強化したいとします。
例:StatefulNetworkWordCountの例を変更しました。私は最初のストリームを使って状態を作り、最初のストリームのカウントで2番目のストリームを豊かにしています。
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}
val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = state.getOption.getOrElse(0)
val output = (word, sum)
Some(output)
}
// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
.flatMap(r=>r._2.split(" "))
.map(x => (x, 1))
.mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
.print(1)
// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet)
.flatMap(r=>r._2.split(" "))
.map(x => (x, 1))
.mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
.print(50)
チェックポイントディレクトリには、2つの異なる状態RDDがあります。
私はこの方法では、あなたのための役に立つかもしれ火花1.6.1を使用し、カフカ-0.8.2.1
ありがとう@bug_xshguo、これは良い考えだとは思わない。これらのストリームに対して異なる変換を実行したいと思います。 – banjara