2017-06-20 16 views
1

私はKafkaからのデータを処理するためにSparkストリーミングを使用しています。そして結果をファイル(ローカル)に書きたいと思います。私がコンソールで印刷すると、すべて正常に動作し、結果が得られますが、ファイルに書き込むときにエラーが発生します。Scala:foreachRDD内のファイルに書き込む

私はそれを行うためにPrintWriterを使用しますが、私はこのエラーを取得する:

Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable 
java.io.PrintWriter 
Serialization stack: 
    - object not serializable (class: java.io.PrintWriter, value: [email protected]) 
    - field (class: streaming.followProduction$$anonfun$main$1, name: qualityWriter$1, type: class java.io.PrintWriter) 
    - object (class streaming.followProduction$$anonfun$main$1, <function1>) 
    - field (class: streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class streaming.followProduction$$anonfun$main$1) 
    - object (class streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) 
    - object (class org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData, 

私はForeachRDD内でこのようにライターを使用することはできませんね!ここで

が私のコードです:

object followProduction extends Serializable { 

    def main(args: Array[String]) = { 

    val qualityWriter = new PrintWriter(new File("diskQuality.txt")) 
    qualityWriter.append("dateTime , quality , status \n") 

    val sparkConf = new SparkConf().setMaster("spark://address:7077").setAppName("followProcess").set("spark.streaming.concurrentJobs", "4") 
    val sc = new StreamingContext(sparkConf, Seconds(10)) 

    sc.checkpoint("checkpoint") 

    val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "address:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> s"${UUID.randomUUID().toString}", 
     "auto.offset.reset" -> "earliest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 

    val topics = Array("A", "C") 

    topics.foreach(t => { 

     val stream = KafkaUtils.createDirectStream[String, String](
     sc, 
     PreferConsistent, 
     Subscribe[String, String](Array(t), kafkaParams) 
    ) 

     stream.foreachRDD(rdd => { 

     rdd.collect().foreach(i => { 

      val record = i.value() 
      val newCsvRecord = process(t, record) 

      println(newCsvRecord) 

      qualityWriter.append(newCsvRecord) 

     }) 
     }) 

    }) 

    qualityWriter.close() 

    sc.start() 
    sc.awaitTermination() 

    } 

    var componentQuantity: componentQuantity = new componentQuantity("", 0.0, 0.0, 0.0) 
    var diskQuality: diskQuality = new diskQuality("", 0.0) 

    def process(topic: String, record: String): String = topic match { 
    case "A" => componentQuantity.checkQuantity(record) 
    case "C" => diskQuality.followQuality(record) 
    } 
} 

私はこのクラスを持っている私が呼び出しています:

case class diskQuality(datetime: String, quality: Double) extends Serializable { 

    def followQuality(record: String): String = { 

    val dateFormat: SimpleDateFormat = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss") 


    var recQuality = msgParse(record).quality 
    var date: Date = dateFormat.parse(msgParse(record).datetime) 
    var recDateTime = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss").format(date) 

    // some operations here 

    return recDateTime + " , " + recQuality 

    } 

    def msgParse(value: String): diskQuality = { 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 

    implicit val formats = DefaultFormats 

    val res = parse(value).extract[diskQuality] 
    return res 

    } 
} 

どのように私はこれを達成することができますか?私はSparkとScalaの両方に新しいので、おそらく私は正しいことをしていません。 はお時間をいただき、ありがとうございます

編集:私は私のコードを変更したと私はもう、このエラーを取得しない

。しかし、同時に、私はファイル内の最初の行だけを持ち、レコードは追加されません。内部の作家(handleWriter)は実際には動作しません。ここで

は私のコードです:

私は欠場でした
stream.foreachRDD(rdd => { 

    val qualityWriter = new PrintWriter(file) 
    qualityWriter.write("dateTime , quality , status \n") 
    qualityWriter.close() 

    rdd.collect().foreach(i => 
    { 
     val record = i.value() 

     val newCsvRecord = process(topic , record) 

     val handleWriter = new PrintWriter(file) 
     handleWriter.append(newCsvRecord) 
     handleWriter.close() 

     println(newCsvRecord) 
    }) 
    }) 

stream.foreachRDD(rdd => { 
    val qualityWriter = new PrintWriter(new File("diskQuality.txt")) 
    qualityWriter.append("dateTime , quality , status \n") 

    rdd.collect().foreach(i => { 
    val record = i.value() 
    val newCsvRecord = process(t, record) 
    qualityWriter.append(newCsvRecord) 
    }) 
    }) 
}) 

答えて

2

:たぶん私は、最も簡単な方法は、それが機能の閉鎖によって捕獲されないことを意味する、foreachRDDPrintWriterのインスタンスを作成することです。この間違った...

1

をやっていますPrintWriterはローカルリソースであり、単一のマシンにバインドされており、シリアル化することはできません。

このオブジェクトをJava直列化計画から削除するには、@transientと宣言できます。つまり、followProductionオブジェクトのシリアル化フォームはこのフィールドをシリアル化しようとしません。質問のコードで

、のように宣言する必要があります。そして、それはforeachRDDクロージャ内で使用することが可能となる

@transient val qualityWriter = new PrintWriter(new File("diskQuality.txt")) 

ただし、このプロセスでは、ファイルの適切な処理に関連する問題は解決されません。 qualityWriter.close()はストリーミングジョブの最初のパスで実行され、ファイルディスクリプタはジョブの実行中に書き込み用に閉じられます。 Fileなどのローカルリソースを適切に使用するには、foreachRDDクロージャ内のPrintWriterを再作成するためにYuvalの提案に従います。欠けている部分は、追加モードで新しいPrintWritterを宣言しています。

「spark.streaming.concurrentJobs」:問題のコードについて

// Initialization phase 

val qualityWriter = new PrintWriter(new File("diskQuality.txt")) 
qualityWriter.println("dateTime , quality , status") 
qualityWriter.close() 

.... 

dstream.foreachRDD{ rdd => 

    val data = rdd.map(e => e.value()) 
       .collect() // get the data locally 
       .map(i=> process(topic , i)) // create csv records 
    val allRecords = data.mkString("\n") // why do I/O if we can do in-mem?  
    val handleWriter = new PrintWriter(file, append=true) 
    handleWriter.append(allRecords) 
    handleWriter.close() 

} 

ほとんどのノート:foreachRDD内の変更されたコードは、(いくつかの追加のコードの変更を行う)、このようになります。 、 "4"

これにより、複数のスレッドが同じローカルファイルに書き込む際に問題が発生します。これはおそらくこの文脈で誤用されているでしょう。

sc.checkpoint( "チェックポイント")

このジョブにチェックポイントは必要はないように思えます。

関連する問題