2016-07-13 11 views
3

効率性と堅牢性を高めるために、より多くのDataFrame操作を使用するためにSparkアプリケーションを書き直しています。しかし、DataFramesで処理できないアプリケーションの一部があり、RDDにドロップする必要があります。その本質に剥奪、コードは次のようになります。Spark DataFrameによるパーティショニングを確実に行うにはどうすればいいですか?

C = A.join(B, join_key) # join_key is a string naming a column 
D = C.rdd.mapPartitions(do_something) 

を正しく動作させるために、do_somethingC.rddjoin_keyで仕切られている必要があります。私はだと思います。これは、equijoinsがキーでデータを分割し、キー値が同じペアを形成することで機能します。 SparkのRDD結合では、パーティションデータに対するイテレータによって暗黙的にペアが形成され、Sparkにイテレータを「マテリアライズ」しない限り、ペアが定義されたパーティションから離れることはありません私はここでやっていない結果を再分割します。私は、DataFrameの結合についても同じことが予想されます。

上記の説明では、が見つかりません希望のパーティションが保証されています。私はSpark実装の詳細についてはAPIを介して保証されていないので、100%安全であるかどうかはわかりません。 Catalystオプティマイザが、同じキーを共有するペアのグループに追加のパーティション境界を投げたり、分割したり、アルゴリズムを誤ってしまうことはないという保証はありません。

do_something関数を適用する前に、私は明示的にC.rdd.partitionBy(lambda x: x['join_key'])を実行することができますが、これは不要なシリアライズやシャッフルなどのオーバーヘッドを引き起こす可能性があると心配しています。

this blog postによると、HiveQLのDISTRIBUTE BYも使用できるようですが、やはりこのトリガーがどのようなオーバーヘッドになるかわかりません。

私の質問です:結合によって引き起こされる暗黙のパーティショニングに頼っても安全ですか、それとも明示的に保証する必要がありますか?もしそうなら、それを保証する最も効率的な方法は何ですか?私はPySpark 1.6.2で作業しています。

答えて

4

一般的に、特定の結合メカニズムは契約の一部ではなく、パーティション化の前提が失敗した場合、比較的簡単に合成例を構築できます。例えば、特定の条件にjoinは再分割トリガされませんどのBroadcastHashJoinのように表すことができます。

from pyspark.sql.functions import broadcast 

# Just so we can easily inspect the results 
sqlContext.setConf("spark.sql.shuffle.partitions", 4) 

a = (sc 
    .parallelize([(1, "a"), (2, "b"), (3, "a"), (4, "b")], 2) 
    .toDF(["id", "join_key"])) 

# Lets hint optimizer that b can be broadcasted 
b = broadcast(
    sc.parallelize([("a", "foo"), ("b", "bar")]).toDF(["join_key", "foobar"]) 
) 

c = a.join(b, "join_key") 
c.rdd.glom().collect() 

## [[Row(join_key='a', id=1, foobar='foo'), 
## Row(join_key='b', id=2, foobar='bar')], 
## [Row(join_key='a', id=3, foobar='foo'), 
## Row(join_key='b', id=4, foobar='bar')]] 

が参加する放送その下にいくつかの他の条件は、明示的なヒントなしで使用することができる(例えば、Databricks Guide - SQL, DataFrames & Datasets/BroadcastHashJoinを参照のこと)と保証ことはありません将来いくつかの追加メカニズムが追加されることはありません。

結果を確認したい場合は、明示的に再パーティション化する必要があります。

c.repartition("join_key").rdd.glom().collect() 

## [[], 
## [Row(join_key='b', id=2, foobar='bar'), 
## Row(join_key='b', id=4, foobar='bar')], 
## [Row(join_key='a', id=1, foobar='foo'), 
## Row(join_key='a', id=3, foobar='foo')], 
## []] 

ここでもう一つの問題は、効率性と堅牢性ためDataFramesを使用しています。あなたのロジックがPythonで直接データにアクセスすること(SQL式とは対照的に)に大きく依存している場合は、データを渡すことはかなりパターンに反します。DataFramesあなたは私の答えを確認することができますSpark functions vs UDF performance?同様の問題をカバーしています。多くの場合、データを移動するコストは、SQL最適化のすべての利点を簡単に消費するため、このアプローチにコミットする前にベンチマークを行うようにしてください。

+0

非常に有益な答えです。 1つのフォローアップ:もしあなたが推測しなければならないなら、もっと早くなると思いますか? 'rdd。(key).mapPartitions(do_something) 'または' data_frame.partitionBy(key).rdd.mapPartitions(do_something) 'のどちらかです。別の言い方をすれば、DataFramesにパーティショニングを単純に処理させ、残りの部分をRDDで処理させることによるメリットはありますか?彼らは基本的に同じことをやっているが、DataFrameはserdeと安全でない変換オーバーヘッドを追加しているので、答えはおそらくいいえのように聞こえるでしょうか? – Paul

+2

私はそれが役に立つとうれしいです。興味深い質問です。 RDD vs DataFrameに関して私はこれをベンチマークしようとしていないので、決定的な答えは得られませんが、おそらく 'DataFrame'を使うでしょう。これは特殊なメカニズムであるため、今ではなくてもさらに最適化することができます。 'df.partitionBy'は' DataFrame'を分割しないことを覚えておいてください。 DFを分割するには、答えにあるように 'repartition'を使うべきです。 – zero323

関連する問題