私はKafkaからのデータを処理するためにSparkストリーミングを使用しています。そして結果をファイル(ローカル)に書きたいと思います。私がコンソールで印刷すると、すべて正常に動作し、結果が得られますが、ファイルに書き込むときにエラーが発生します。Scala:foreachRDD内のファイルに書き込む
私はそれを行うためにPrintWriter
を使用しますが、私はこのエラーを取得する:
Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
java.io.PrintWriter
Serialization stack:
- object not serializable (class: java.io.PrintWriter, value: [email protected])
- field (class: streaming.followProduction$$anonfun$main$1, name: qualityWriter$1, type: class java.io.PrintWriter)
- object (class streaming.followProduction$$anonfun$main$1, <function1>)
- field (class: streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class streaming.followProduction$$anonfun$main$1)
- object (class streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData,
私はForeachRDD内でこのようにライターを使用することはできませんね!ここで
が私のコードです:
object followProduction extends Serializable {
def main(args: Array[String]) = {
val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
qualityWriter.append("dateTime , quality , status \n")
val sparkConf = new SparkConf().setMaster("spark://address:7077").setAppName("followProcess").set("spark.streaming.concurrentJobs", "4")
val sc = new StreamingContext(sparkConf, Seconds(10))
sc.checkpoint("checkpoint")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "address:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> s"${UUID.randomUUID().toString}",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("A", "C")
topics.foreach(t => {
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](Array(t), kafkaParams)
)
stream.foreachRDD(rdd => {
rdd.collect().foreach(i => {
val record = i.value()
val newCsvRecord = process(t, record)
println(newCsvRecord)
qualityWriter.append(newCsvRecord)
})
})
})
qualityWriter.close()
sc.start()
sc.awaitTermination()
}
var componentQuantity: componentQuantity = new componentQuantity("", 0.0, 0.0, 0.0)
var diskQuality: diskQuality = new diskQuality("", 0.0)
def process(topic: String, record: String): String = topic match {
case "A" => componentQuantity.checkQuantity(record)
case "C" => diskQuality.followQuality(record)
}
}
私はこのクラスを持っている私が呼び出しています:
case class diskQuality(datetime: String, quality: Double) extends Serializable {
def followQuality(record: String): String = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss")
var recQuality = msgParse(record).quality
var date: Date = dateFormat.parse(msgParse(record).datetime)
var recDateTime = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss").format(date)
// some operations here
return recDateTime + " , " + recQuality
}
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
}
どのように私はこれを達成することができますか?私はSparkとScalaの両方に新しいので、おそらく私は正しいことをしていません。 はお時間をいただき、ありがとうございます
編集:私は私のコードを変更したと私はもう、このエラーを取得しない
。しかし、同時に、私はファイル内の最初の行だけを持ち、レコードは追加されません。内部の作家(handleWriter)は実際には動作しません。ここで
は私のコードです:
私は欠場でしたstream.foreachRDD(rdd => {
val qualityWriter = new PrintWriter(file)
qualityWriter.write("dateTime , quality , status \n")
qualityWriter.close()
rdd.collect().foreach(i =>
{
val record = i.value()
val newCsvRecord = process(topic , record)
val handleWriter = new PrintWriter(file)
handleWriter.append(newCsvRecord)
handleWriter.close()
println(newCsvRecord)
})
})
?
stream.foreachRDD(rdd => {
val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
qualityWriter.append("dateTime , quality , status \n")
rdd.collect().foreach(i => {
val record = i.value()
val newCsvRecord = process(t, record)
qualityWriter.append(newCsvRecord)
})
})
})