2

私は、2列 - col1col2を持つスパークデータフレームを持っています。なぜSpark DataFrameが間違ったパーティション数を作成していますか?

scala> val df = List((1, "a")).toDF("col1", "col2") 
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string] 

私はcol1に一意の値の数に等しいファイルの数のすべてのデータを書き込むために、parquet形式でディスクにdfを書くとき、私はこのように、col1を使用してrepartitionの操作を行います。

scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file") 

上記のコードは、ファイルシステム内に1つのファイルしか生成しません。しかし、シャッフル処理の回数は、私がcol1 1つの値のみ、すなわち、1が含まれている場合、なぜそれがrepartitionで200個のパーティションを作成していることを、ここで一つのことを理解することはできませんよ200

enter image description here

なり?スパークSQLシャッフル世界で

答えて

4

repartition(columnName)によって制御されている200は、何の問題のどのように多くのユニークな値、(より具体的な、spark.sql.shuffle.partitionsパーティション)200個のパーティションを作成していませんcol1あります。ユニークな値がcol1の場合は、199個のパーティションが空になります。一方、一意の値がcol1の場合、パーティションごとに複数の値がcol1になります。

パーティションが1つだけの場合は、repartition(1,col("col1"))またはcoalesce(1)とすることができます。しかし​​3210はあなたのパーティションの内容を確認したい場合は、私がしました

How to prevent Spark optimizationを参照)​​3210私は、あなたが並列性を失うことが、あなたのコードsuscht中まで更に移動するという意味で同じように動作しないではないことこれには2つの方法があります:

// calculates record count per partition 
def inspectPartitions(df: DataFrame) = { 
    import df.sqlContext.implicits._ 
    df.rdd.mapPartitions(partIt => { 
     Iterator(partIt.toSeq.size) 
    } 
    ).toDF("record_count") 
} 

// inspects how a given key is distributed accross the partition of a dataframe 
def inspectPartitions(df: DataFrame, key: String) = { 
    import df.sqlContext.implicits._ 
    df.rdd.mapPartitions(partIt => { 
     val part = partIt.toSet 
     val partSize = part.size 
     val partKeys = part.map(r => r.getAs[Any](key).toString.trim) 
     val partKeyStr = partKeys.mkString(", ") 
     val partKeyCount = partKeys.size 
     Iterator((partKeys.toArray,partSize)) 
    } 
    ).toDF("partitions","record_count") 
} 

次のようにデータフレームを確認してください:

inspectPartitions(df.repartition(col("col1"),"col1") 
.where($"record_count">0) 
.show 
0

、シャッフルパーティションのデフォルト数はデフォルトごとにspark.sql.shuffle.partitions

関連する問題