2016-12-05 3 views
1

私は、カフカからデータを連続的に受信できるstateful-wordCountスパークストリーミングアプリケーションを作成します。私のコードにはmapWithState関数が含まれており、正しく実行できます。スパークUIでストリーミング統計を確認すると、周期パルスがの処理時間にあります。私はこれがチェックポイントの使用によって引き起こされると思います。誰かがこれを説明できることを願って、すばらしいおかげで!スパークストリーミングでmapWithState/checkpointを使用しているときに、定期的なパルスが処理されるタイムチャートが表示されるのはなぜですか?

The Streaming Statistics

と完成したバッチテーブル:

batches processing time

私はいくつかの1秒のタイムコストのバッチはperiodicly起こる見つけます。次に、1秒間コストバッチと1秒未満コストバッチに進み、1秒間コストバッチでもう1つのジョブが実行されていることがわかりました。バッチの2種類を比較

1-second-time-cost batch subsecond-time-cost batch

checkpointによって引き起こされているように見えるが、私はよく分かりません。

誰でも私のために詳しく説明できますか?ありがとう!

は、ここに私のコードです:

import kafka.serializer.StringDecoder 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.SparkConf 

object StateApp { 

    def main(args: Array[String]) { 

    if (args.length < 4) { 
     System.err.println(
     s""" 
      |Usage: KafkaSpark_008_test <brokers> <topics> <batchDuration> 
      | <brokers> is a list of one or more Kafka brokers 
      | <topics> is a list of one or more kafka topics to consume from 
      | <batchDuration> is the batch duration of spark streaming 
      | <checkpointPath> is the checkpoint directory 
     """.stripMargin) 
     System.exit(1) 
    } 

    val Array(brokers, topics, bd, cpp) = args 

    // Create context with 2 second batch interval 
    val sparkConf = new SparkConf().setAppName("KafkaSpark_080_test") 
    val ssc = new StreamingContext(sparkConf, Seconds(bd.toInt)) 

    ssc.checkpoint(cpp) 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet) 

    // test the messages' receiving speed 
    messages.foreachRDD(rdd => 
     println(System.currentTimeMillis() + "\t" + System.currentTimeMillis()/1000 + "\t" + (rdd.count()/bd.toInt).toString)) 

    // the messages' value type is "timestamp port word", eg. "1479700000000 10105 ABC" 
    // wordDstream: (word, 1), eg. (ABC, 1) 
    val wordDstream = messages.map(_._2).map(msg => (msg.split(" ")(2), 1)) 

    // this is from Spark Source Code example in Streaming/StatefulNetworkWordCount.scala 
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { 
     val sum = one.getOrElse(0) + state.getOption.getOrElse(0) 
     val output = (word, sum) 
     state.update(sum) 
     output 
    } 

    val stateDstream = wordDstream.mapWithState(
     StateSpec.function(mappingFunc)).print() 

    // Start the computation 
    ssc.start() 
    ssc.awaitTermination() } 

} 

答えて

1

あなたが永続ストレージにデータをチェックポイントによって引き起こされる参照これらの小さなスパイクが。 Sparkが状態を完全に変換するには、障害が発生した場合に回復できるように、定義された間隔ごとにデータを確実に保存する必要があります。

スパイクは50秒ごとに実行されるため、時間的に一貫していることに注意してください。この計算は:(batch time * default multiplier)です。現在のデフォルトの乗数は10です。あなたの場合、これは5 * 10 = 50で、スパイクが50秒ごとに表示される理由を説明しています。

+0

ありがとうございました!では、乗数の値を設定するにはどうしたらいいですか?これの実装ソースコードはどこで確認できますか? –

+0

@Chenghao [こちら](http://stackoverflow.com/questions/36042295/spark-streaming-mapwithstate-seems-to-rebuild-complete-state-periodically/36065778#36065778)の回答をご覧ください。 –

+0

私はそれを手に入れました!ありがとうございました! –

関連する問題