2016-10-27 30 views
2

私はKafka Queueからコンテンツを取り出し、いくつかの前処理と構造化の後にデータをMySQLテーブルに入れようとするSparkStreaming Appを構築しました。RDD toDF():誤った動作

私はSparkStreamingContextで 'foreachRDD'メソッドを呼び出します。私が直面している問題は、RDD上のsaveAsTextFileとformat( "csv")付きのDataFrameのwriteメソッドを呼び出す間のデータロスがあることです。なぜこれが起こっているのかを私は指摘することはできません。

val ssc = new StreamingContext(spark.sparkContext, Seconds(60)) 
ssc.checkpoint("checkpoint") 

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
    stream.foreachRDD { 
    rdd => { 
    rdd.saveAsTextFile("/Users/jarvis/rdds/"+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_rdd") 

    import spark.implicits._ 

     val messagesDF = rdd.map(_.split("\t")).map(w => { Record (w(0), autoTag(w(1),w(4)) , w(2), w(3), w(4), w(5).substring(w(5).lastIndexOf("http://")), w(6).split("\n")(0))}).toDF("recordTS","tag","channel_url","title","description","link","pub_TS") 

     messagesDF.write.format("csv").save(dumpPath+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_DF") 
     } 
    } 

    ssc.start() 
    ssc.awaitTermination() 

データが失われています。つまり、多くの行がRDDのDataFrameにデータを送信しません。 レプリケーションもあります。Dataframeに到達する行の多くは、何度も複製されます。

+0

あなたができることは 'rdd'を' df'に変換してから 'text'ファイルだけでなく' csv'にも同じDFを書くことができます。 dfをテキストファイルに保存するには 'df.write.text(" file path ")' – Shankar

+0

また、CSVとテキストファイルに書き込む前にDFを 'キャッシュ 'することができます。 – Shankar

答えて

0

エラーを見つけました。実際には、取り込まれたデータ形式について間違った理解がありました。

目的のデータは "\ t \ t \ t ..."なので、行は "\ n"に分割されていると考えられます。だったが、実際のデータ

: "\ T \ T \ T ... \ N \ T \ T \ T ... \ n" はそう

rdd.map(...)操作すべての "\ n"で分割する別のマップが必要でした