2016-09-09 4 views
1

こんにちは私は、Apache Spark Streamingを使用してTwitterからTwitterを読んで、DataFrameに変換しようとしています。私は下に貼り付けたアプローチを持っています。しかし、私は正しいアプローチを得ることができません。いくつかの指針は大歓迎です。DStreamをデータフレームに変換する

foreach内でDFに変換するとわかるように、tweetStreamから1つのDFを取得することはできません。私はおそらく私がこれに新しいときに間違ったアプローチをしています。私はこれにどのようにアプローチするのですか?

val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth).filter(status=>status.getLang=="en") 
     .map(status=>gson.toJson(status)) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 
    tweetStream.foreachRDD({status=>val DF = status.toDF()}) 
+0

私はforeachRDD内で計算された全体のDFを取得するために、ループ内DF.merge()を使用して考えていました{} – Ayon

答えて

0
私はそれを試していないが、多分このようなものは動作します

var df_tweets:DataFrame = null 

    dstream_tweets.foreachRDD { 
    rrd => if (df_tweets != null) { 
     df_tweets = df_tweets.unionAll(rdd.toDF) // combine previous dataframe 
    } else { 
     df_tweets = rdd.toDF() // create new dataframe 
     } 
    } 
関連する問題