私はKafka Queueからコンテンツを取り出し、いくつかの前処理と構造化の後にデータをMySQLテーブルに入れようとするSparkStreaming Appを構築しました。RDD toDF():誤った動作
私はSparkStreamingContextで 'foreachRDD'メソッドを呼び出します。私が直面している問題は、RDD上のsaveAsTextFileとformat( "csv")付きのDataFrameのwriteメソッドを呼び出す間のデータロスがあることです。なぜこれが起こっているのかを私は指摘することはできません。
val ssc = new StreamingContext(spark.sparkContext, Seconds(60))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
stream.foreachRDD {
rdd => {
rdd.saveAsTextFile("/Users/jarvis/rdds/"+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_rdd")
import spark.implicits._
val messagesDF = rdd.map(_.split("\t")).map(w => { Record (w(0), autoTag(w(1),w(4)) , w(2), w(3), w(4), w(5).substring(w(5).lastIndexOf("http://")), w(6).split("\n")(0))}).toDF("recordTS","tag","channel_url","title","description","link","pub_TS")
messagesDF.write.format("csv").save(dumpPath+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_DF")
}
}
ssc.start()
ssc.awaitTermination()
データが失われています。つまり、多くの行がRDDのDataFrameにデータを送信しません。 レプリケーションもあります。Dataframeに到達する行の多くは、何度も複製されます。
あなたができることは 'rdd'を' df'に変換してから 'text'ファイルだけでなく' csv'にも同じDFを書くことができます。 dfをテキストファイルに保存するには 'df.write.text(" file path ")' – Shankar
また、CSVとテキストファイルに書き込む前にDFを 'キャッシュ 'することができます。 – Shankar