2017-10-07 12 views
0

私はApache Sparkを使ってTwitterデータをストリーミングしようとしましたが、ストリームデータをCSVファイルとして保存したいのですが、どうしてですか? コードを修正してcsvSpark-Scala:csvファイル(RDD)として保存

私はRDDを使用しています。

これは私のメインのコードです:

val ssc = new StreamingContext(conf, Seconds(3600)) 
val stream = TwitterUtils.createStream(ssc, None, filters) 

val tweets = stream.map(t => { 
    Map(
    // This is for tweet 
    "text" -> t.getText, 
    "retweet_count" -> t.getRetweetCount, 
    "favorited" -> t.isFavorited, 
    "truncated" -> t.isTruncated, 
    "id_str" -> t.getId, 
    "in_reply_to_screen_name" -> t.getInReplyToScreenName, 
    "source" -> t.getSource, 
    "retweeted" -> t.isRetweetedByMe, 
    "created_at" -> t.getCreatedAt, 
    "in_reply_to_status_id_str" -> t.getInReplyToStatusId, 
    "in_reply_to_user_id_str" -> t.getInReplyToUserId, 

    // This is for tweet's user 
    "listed_count" -> t.getUser.getListedCount, 
    "verified" -> t.getUser.isVerified, 
    "location" -> t.getUser.getLocation, 
    "user_id_str" -> t.getUser.getId, 
    "description" -> t.getUser.getDescription, 
    "geo_enabled" -> t.getUser.isGeoEnabled, 
    "user_created_at" -> t.getUser.getCreatedAt, 
    "statuses_count" -> t.getUser.getStatusesCount, 
    "followers_count" -> t.getUser.getFollowersCount, 
    "favorites_count" -> t.getUser.getFavouritesCount, 
    "protected" -> t.getUser.isProtected, 
    "user_url" -> t.getUser.getURL, 
    "name" -> t.getUser.getName, 
    "time_zone" -> t.getUser.getTimeZone, 
    "user_lang" -> t.getUser.getLang, 
    "utc_offset" -> t.getUser.getUtcOffset, 
    "friends_count" -> t.getUser.getFriendsCount, 
    "screen_name" -> t.getUser.getScreenName 
) 
}) 

tweets.repartition(1).saveAsTextFiles("~/streaming/tweets") 
+0

を与えられた溶液を使用してデータフレームにマップを変換することができ、スパーク暗黙の型変換

import spark.implicits._ tweets.toDF.write.csv(...) // saves as CSV 

を使用してデータフレームにつぶやきを変換します。後者の場合、フォーマットオプションとしてCSVをとる書き込みメソッドがあります –

+0

@ cricket_007 RDD –

+0

@ user8371915いいえ、DDD –

答えて

2

あなたはRDD [地図[文字列、]文字列] CSVとして保存するデータフレームにあるツイートを変換する必要があります。理由は簡単ですRDDにはスキーマがありません。 CSV形式には特定のスキーマがあります。したがって、RDDをスキーマを持つデータフレームに変換する必要があります。

これにはいくつかの方法があります。 1つのアプローチは、データをマップに入れる代わりにケース・クラスを使用することです。

case class(text:String, retweetCount:Int ...) 

Map(...)の代わりに、適切なパラメータを使用してケースクラスをインスタンス化します。

最後にあなたがRDDまたはデータセット/フレームを持っていますまた、あなたがhere

+0

のRDDを使用していますか? –

+0

私はそのアプローチを使用してデータフレームにマップを変換しようとしましたが、どのようにTwitterの情報(getText、getRetweetCount、...)を割り当てることができますか? –

+0

それは最後に動作します、ありがとう –

関連する問題