2017-11-20 13 views
0

Sparkを使用してディレクトリからパーティションに動的にデータを書きたいと思っています。 サンプルコードです。複数のファイルに同じキーのレコードを書き込む方法(カスタムパーティショナー)

val input_DF = spark.read.parquet("input path") 
input_DF.write.mode("overwrite").partitionBy("colname").parquet("output path...") 

以下に示すように、各キーごとにレコードのいかなるは異なるされず、スキューキーのためにそこにあります。このため input_DF.groupBy($ "COLNAME ").agg(カウント(" COLNAME"))。ショー()

+-----------------+------------------------+ 
|colname   |count(colname)   | 
+-----------------+------------------------+ 
|    NA|    14859816| --> More no of records 
|    A|     2907930| 
|    D|     1118504| 
|    B|     485151| 
|    C|     435305| 
|    F|     370095| 
|    G|     170060| 
+-----------------+------------------------+ 

、ジョブが合理的なメモリ(8ギガバイト)が与えられたときに失敗していますそれぞれのエグゼキュータのために。各エグゼキュータごとに高いメモリ(15GB)が与えられても完了するまでに時間がかかりすぎると、ジョブは正常に完了しています。

私はパーティション間でデータを均等に分配することを期待してパーティションを再利用しようとしました。しかし、デフォルトのHashPartitionerを使用するので、キーのレコードは単一のパーティションに移動します。

repartition(num partition,$"colname") --> Creates HashPartition 

しかし、これはrepartitonで述べたようにnum個の部品ファイルを作成しますが、(COL値NAを​​持つすべてのレコードがパーティションになる)パーティションへのキーのすべてのレコードを移動しています。残りのパーツファイルにはレコードがありません(パーケットメタデータのみ、38364バイト)。私は

  1. を知っていただきたいと思い

     -rw-r--r-- 2 hadoop hadoop   0 2017-11-20 14:29 /user/hadoop/table/_SUCCESS 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00000-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00001-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00002-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00003-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:07 /user/hadoop/table/part-r-00004-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00005-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00006-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 1038264502 2017-11-20 13:20 /user/hadoop/table/part-r-00007-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00008-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00009-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00010-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00011-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00012-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00013-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00014-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 128212247 2017-11-20 13:09 /user/hadoop/table/part-r-00015-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00016-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00017-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00018-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 117142244 2017-11-20 13:08 /user/hadoop/table/part-r-00019-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00020-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 347033731 2017-11-20 13:11 /user/hadoop/table/part-r-00021-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00022-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00023-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00024-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 100306686 2017-11-20 13:08 /user/hadoop/table/part-r-00025-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 36961707 2017-11-20 13:07 /user/hadoop/table/part-r-00026-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00027-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00028-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00029-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00030-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00031-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00032-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00033-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00034-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00035-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00036-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:07 /user/hadoop/table/part-r-00037-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00038-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00039-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00040-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00041-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  68859 2017-11-20 13:06 /user/hadoop/table/part-r-00042-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop 4031720288 2017-11-20 14:29 /user/hadoop/table/part-r-00043-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00044-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00045-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00046-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00047-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00048-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00049-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
         -rw-r--r-- 2 hadoop hadoop  38634 2017-11-20 13:06 /user/hadoop/table/part-r-00050-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet 
    

    は、データフレーム/ RDDの異なるパーティションに同一のキーレコードを書き込むための方法はありますか?はい、それは最高のようなパラメータデータフレーム/ RDDのパーティションあたりのバイトの一切を使用していない制御することができる

    (1st rec to partition 1) 
    (2nd rec to partition 2) 
    (3rd rec to partition 3) 
    (4th rec to partition 4) 
    (5th rec to partition 1) 
    (6th rec to partition 2) 
    (7th rec to partition 3) 
    (8th rec to partition 4) 
    
  2. 場合はおそらく、カスタムパーティショナーは、N番目のパーティションにすべてのN番目のレコードを書き込みます。

期待される結果は、単にキーに基づいて異なるサブディレクトリに(ハイブ用パーティション)データを書き込んでいるように、私は、各パートを書いて、複数のタスクへのキーのレコードを配布することで、データを書きたいですサブディレクトリ下のファイル。

+0

"partitionBy"で使用されたキーではなく、ユニークキーで修復が行われました。なんらかの理由でdataFrameがユニークでない場合は、 df.withColumn( "Unique_ID"、monotonicallyIncreasingId) を使用してsudoカラムを追加し、 "Unique_ID"で修復して複数のパーティションに均等にデータを分散できます。さらにパフォーマンスを向上させるために、DataFrameパーティション内で、結合/グループ/パーティションに使用されるキーでデータをソートすることができます – Ram

答えて

0

「partitionBy」で使用されたキーではなく、一意のキーで修復が行われたときに問題が解決されました。データフレームは、何らかの理由でユニークに欠落している場合、1は

df.withColumn("Unique_ID", monotonicallyIncreasingId) 

を、sudoを使用して列を追加し、「UNIQUE_ID」に賠償することができ、この方法は、私たちは、均等に複数のパーティションにデータを配布することができます。さらにパフォーマンスを向上させるために、DataFrameパーティション内のデータをジョイン/グループ/パーティションに使用するキーでソートすることができます。

関連する問題