2016-09-02 12 views
1

大きな寄せ木細工ファイルをHDFSの異なるフォルダーにある複数の寄木細工ファイルに分割したいので、パーティションテーブル(Hive/Drill/Spark SQL)その上に。1つの大きな寄木細工ファイルをキーで複数の寄木細工ファイルに分割

データ例:

+-----+------+ 
|model| num1| 
+-----+------+ 
| V80| 195.0| 
| V80| 750.0| 
| V80| 101.0| 
| V80| 0.0| 
| V80| 0.0| 
| V80| 720.0| 
| V80|1360.0| 
| V80| 162.0| 
| V80| 150.0| 
| V90| 450.0| 
| V90| 189.0| 
| V90| 400.0| 
| V90| 120.0| 
| V90| 20.3| 
| V90| 0.0| 
| V90| 84.0| 
| V90| 555.0| 
| V90| 0.0| 
| V90| 9.0| 
| V90| 75.6| 
+-----+------+ 

結果フォルダ構造は、 "モデル" フィールドでグループ化する必要があります。

def main(args: Array[String]): Unit = { 
    val conf = new SparkConf() 
    case class Infos(name:String, name1:String) 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val rdd = sqlContext.read.load("hdfs://nameservice1/user/hive/warehouse/a_e550_parquet").select("model", "num1").limit(10000) 

    val tmpRDD = rdd.map { item => (item(0), Infos(item.getString(0), item.getString(1))) }.groupByKey() 

    for (item <- tmpRDD) { 
     import sqlContext.implicits._ 
     val df = item._2.toSeq.toDF() 
     df.write.mode(SaveMode.Overwrite).parquet("hdfs://nameservice1/tmp/model=" + item._1) 
    } 
    } 

ちょうど投げた:私はこのようなスクリプトを試してみました

+ 
| 
+-----model=V80 
|  | 
|  +----- XXX.parquet 
+-----model=V90 
|  | 
|  +----- XXX.parquet 

ヌルポイント例外が発生します。

答えて

1

DataFrameのpartitionByを使用する必要があります。 groupByは必要ありません。以下のようなものは、あなたが望むものを与えるはずです。

val df = sqlContext.read.parquet("hdfs://nameservice1/user/hive/warehouse/a_e550_parquet").select("model", "num1").limit(10000) 
df.write.partitionBy("model").mode(SaveMode.Overwrite) 
+0

あなたは正しいですが、これを試しました。しかし、それは非常に遅いようですが、約500Mの記録があります。効率的な方法はありますか? –

+0

これが遅い理由はたくさんあります。あなたがチェックしなければならないことのいくつかは、ジョブがあまりにもシャッフルされている(シャッフルパーティションのサイズを増減できるかどうか、データスキューなどがあるかどうかを参照してください)。あなたはこれをチェックする正しい人です:-) – Jegan

関連する問題