2017-04-19 12 views
0

カフカのトピックを通じたデータストリームがあります。私はSpark Streamingを使ってそれを読んだ。スパークストリーミング - Kafka- createStream - データフレームへのRDD

val ssc = new StreamingContext(l_sparkcontext, Seconds(30)) 
    val kafkaStream = KafkaUtils.createStream(ssc, "xxxx.xx.xx.com:2181", "new-spark-streaming-group", Map("event_log" -> 10)) 

これはうまく動作します。私が望むのは、ストリームデータに列を割り当てることによって、これを寄木細工ファイルに書き込むことです。したがって、私は、次の

kafkaStream.foreachRDD(rdd => { 
    if (rdd.count() == 0) { 
    println("No new SKU's received in this time interval " + Calendar.getInstance().getTime()) 
    } 
    else { 
    println("No of SKUs received " + rdd.count()) 
    rdd.map(record => { 
     record._2 
    }).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath) 

は、しかし、これは私が作っていて下さい過ちは何のエラー

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. 
Old column names (1): _1 
New column names (6): customer_id, sku, type, event, control_group, event_date 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:224) 
    at org.apache.spark.sql.DataFrameHolder.toDF(DataFrameHolder.scala:36) 
    at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:77) 
    at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:69) 

を与えません。地図を分割すべきか?私たちがそれをすると、それをFDに変換することはできません( ".. columns ..")

ありがとうございました。

よろしく

バラ

答えて

0

立ち寄ってくれてありがとう。私はこれを整理しました。それはコーディング上の問題でした。将来的にはこれをしたい人のために、再び

バラ

kafkaStream.foreachRDD(rdd => { 
    if (rdd.count() == 0) { 
    println("No new SKU's received in this time interval " + Calendar.getInstance().getTime()) 
    } 
    else { 
    println("No of SKUs received " + rdd.count()) 
    rdd.map(record => (record._2).split(",")) 
    }.map(r => (r(0).replace(Quote,"").toInt,r(1).replace(Quote,"").toInt,r(2),r(3),r(4),r(5))).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath) 
    }) 

おかげで以下のように他の部分を変更してください

関連する問題