2017-08-01 9 views
2

私は次の操作を実行する必要のある多くのスパークデータフレームを持っている:sparkデータフレームで強制的に再パーティション化するには?

1) load a single spark dataframe 
2) select rows from it 
3) merge it with all of the previous spark dataframes 

、上記の操作のそれぞれが異なるのnumberOfパーティションが必要です。行を選択するには、100個のパーティションのような多くのパーティションが必要です。マージには、10パーティションのような非常に少ないパーティションが必要です。

だから、私は本当にそれがこのように仕事をしたい:今すぐ

1) load a single spark dataframe 
1.5) repartition into 100 partitions 
2) select rows from it 
2.5) repartition into 10 partitions 
3) merge it with all of the previous spark dataframes 

、どのように私は、ステップ1と2と2と3の間での間にパーティションを再作成し、これを強制するのですか?

私はdata = data.repartition(7)と呼んだとき、遅延評価されているので、実際に保存しているときにのみ再パーティション化することがわかります。

だから、私はこのようにそれをやっている:

1) load a single spark dataframe 
1.5) repartition into 100 partitions 
1.75) `df.count()` *just* to force materialization 
2) select rows from it 
2.5) repartition into 10 partitions 
2.75) `df.count()` *just* to force materialization 
3) merge it with all of the previous spark dataframes 

はここの間にパーティションを再作成するためにそれを強制するためのより良い方法はありますか?データフレームでcount()を実行するよりも良い方法はありますか?

答えて

4

sparkのデータフレームのすべての変換が遅延評価されるため、実際に変換を実行するアクションを実行する必要があります。現在、変換を強制する他の方法はありません。

利用可能なすべてのデータフレームアクションは、documentationアクション)にあります。あなたのケースでは、count()を使用して変換を強制するのではなく、かなり安くなるfirst()を使用することができます。

ステップ2.5では、シャッフルが完全に行われないように、repartition()coalesce()に置き換えることができます。これは、データの移動を最小限に抑えるので、新しいパーティション数が以前よりも少ない場合に、しばしば有利です。

EDIT:1)再分配、2)スパークデータフレームを変換、3)再区分:

はあなたがどんなアクションを使用すると、単純に実行しないとどうなるかについての質問に答えるために。最適化のために変換が実行されるため、必ずしもこの順序に従うとは限りません。

val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y") 
val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b") 
df1.explain(true) 

これは、データフレームの計算方法に関する情報を返します。ここに見られるように

== Parsed Logical Plan == 
'Filter NOT ('y = b) 
+- Repartition 5, true 
    +- Filter NOT (x#5 = 1.0) 
     +- Repartition 10, true 
     +- Project [_1#2 AS x#5, _2#3 AS y#6] 
      +- LogicalRDD [_1#2, _2#3] 

== Analyzed Logical Plan == 
x: double, y: string 
Filter NOT (y#6 = b) 
+- Repartition 5, true 
    +- Filter NOT (x#5 = 1.0) 
     +- Repartition 10, true 
     +- Project [_1#2 AS x#5, _2#3 AS y#6] 
      +- LogicalRDD [_1#2, _2#3] 

== Optimized Logical Plan == 
Repartition 5, true 
+- Project [_1#2 AS x#5, _2#3 AS y#6] 
    +- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b)) 
     +- LogicalRDD [_1#2, _2#3] 

== Physical Plan == 
Exchange RoundRobinPartitioning(5) 
+- *Project [_1#2 AS x#5, _2#3 AS y#6] 
    +- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b)) 
     +- Scan ExistingRDD[_1#2,_2#3] 

repartition(10)ステップが含まれており、最適化中に削除されているように思われていません。

+0

しかし、私は実際にそれが何を出力しても気にしないとき、 'first()'のような無駄な操作を避ける方法はありますか?私はそれを再分割したいだけですが、実際に何が出力されるかは気にしません。それを避ける方法はありますか? – Sother

+0

残念ながら、すべての変換を適用するためにデータフレームに対してアクションを実行する必要があるため、現在のところ、それを避ける方法はありません。答えにいくつかの情報を追加しました。 – Shaido

+0

@Sother答えを更新しました。 – Shaido

関連する問題