2017-05-29 18 views
0

sparkとscalaの新機能です。 jsonファイルを含むディレクトリを読みたい。このファイルには、20種類の異なる値を持つことができる "EVENT_NAME"という属性があります。属性値に応じて、イベントを区切る必要があります。 EVENT_NAME = event_Aイベントをまとめて表示します。以下のようなハイブ外部テーブル構造でこれらを書く:/アプリケーション/ハイブ/倉庫/ DB/event_A/DT =日付/時間=時間スパークデータフレームをパーティションに書き込む

ここ

私が行くべき各イベントに関連するすべてのイベント・タイプとデータ用に20種類のテーブルを持っていますそれぞれのテーブルに。 私はいくつかのコードを書いていますが、正しくデータを書き込むためには助けが必要です。

{ 
import org.apache.spark.sql._ 
import sqlContext._ 

val path = "/source/data/path" 
val trafficRep = sc.textFile(path) 

val trafficRepDf = sqlContext.read.json(trafficRep) 
trafficRepDf.registerTempTable("trafficRepDf") 

trafficRepDf.write.partitionBy("EVENT_NAME").save("/apps/hive/warehouse/db/sample") 
} 

最後の行が分割された出力を作成しますが、私はそれを必要とする正確にどのようにではありません。それを行うには、どうすれば正しいか、他のコードを入手することをお勧めします。

答えて

1

で非推奨と置き換えられている、私はあなたが保存したい意味と仮定していますSpark/Hiveのを使用せずに、別々のディレクトリにデータをコピーするフォーマット。

Sparkのパーティション分割が強制的に使用されるため、SparkのpartitionByを使用することはできません。

は代わりに、あなたはそうのような、あなたのDataFrameそのコンポーネントのパーティションに侵入し、それらを一つ一つを保存する必要があります。

{ 
    import org.apache.spark.sql._ 
    import sqlContext._ 

    val path = "/source/data/path" 
    val trafficRep = sc.textFile(path) 

    val trafficRepDf = sqlContext.read.json(trafficRep) 
    val eventNames = trafficRepDf.select($"EVENT_NAME").distinct().collect() // Or if you already know what all 20 values are, just hardcode them. 
    for (eventName <- eventNames) { 
    val trafficRepByEventDf = trafficRepDef.where($"EVENT_NAME" === eventName) 
    trafficRepByEventDf.write.save(s"/apps/hive/warehouse/db/sample/${eventName}") 
    } 
} 
0

私は、あなたがEVENT_NAMEdthourで仕切るので、これを試してみる必要がある、あなたは/apps/hive/warehouse/db/EVENT_NAME=xx/dt=yy/hour=zzのようなテーブル構造をしたいと仮定します。

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample") 
+0

データがそれに日付と時刻の情報を持っていません。私はそれを外部に提供する必要があります。 – Anup

1

あなたのデータフレームに日付と時間の列を追加することができます。

import org.apache.spark.sql._ 
import sqlContext._ 

val path = "/source/data/path" 
val trafficRep = sc.textFile(path) 

val trafficRepDf = sqlContext.read.json(trafficRep) 
trafficRepDf.withColumn("dt", lit("dtValue")).withColumn("hour", lit("hourValue")) 

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample") 
関連する問題