私は、カフカからデータを連続的に受信できるstateful-wordCountスパークストリーミングアプリケーションを作成します。私のコードにはmapWithState
関数が含まれており、正しく実行できます。スパークUIでストリーミング統計を確認すると、周期パルスがの処理時間にあります。私はこれがチェックポイントの使用によって引き起こされると思います。誰かがこれを説明できることを願って、すばらしいおかげで!スパークストリーミングでmapWithState/checkpointを使用しているときに、定期的なパルスが処理されるタイムチャートが表示されるのはなぜですか?
と完成したバッチテーブル:
私はいくつかの1秒のタイムコストのバッチはperiodicly起こる見つけます。次に、1秒間コストバッチと1秒未満コストバッチに進み、1秒間コストバッチでもう1つのジョブが実行されていることがわかりました。バッチの2種類を比較
をcheckpoint
によって引き起こされているように見えるが、私はよく分かりません。
誰でも私のために詳しく説明できますか?ありがとう!
は、ここに私のコードです:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object StateApp {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println(
s"""
|Usage: KafkaSpark_008_test <brokers> <topics> <batchDuration>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <batchDuration> is the batch duration of spark streaming
| <checkpointPath> is the checkpoint directory
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics, bd, cpp) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("KafkaSpark_080_test")
val ssc = new StreamingContext(sparkConf, Seconds(bd.toInt))
ssc.checkpoint(cpp)
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// test the messages' receiving speed
messages.foreachRDD(rdd =>
println(System.currentTimeMillis() + "\t" + System.currentTimeMillis()/1000 + "\t" + (rdd.count()/bd.toInt).toString))
// the messages' value type is "timestamp port word", eg. "1479700000000 10105 ABC"
// wordDstream: (word, 1), eg. (ABC, 1)
val wordDstream = messages.map(_._2).map(msg => (msg.split(" ")(2), 1))
// this is from Spark Source Code example in Streaming/StatefulNetworkWordCount.scala
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc)).print()
// Start the computation
ssc.start()
ssc.awaitTermination() }
}
ありがとうございました!では、乗数の値を設定するにはどうしたらいいですか?これの実装ソースコードはどこで確認できますか? –
@Chenghao [こちら](http://stackoverflow.com/questions/36042295/spark-streaming-mapwithstate-seems-to-rebuild-complete-state-periodically/36065778#36065778)の回答をご覧ください。 –
私はそれを手に入れました!ありがとうございました! –