効率性と堅牢性を高めるために、より多くのDataFrame操作を使用するためにSparkアプリケーションを書き直しています。しかし、DataFramesで処理できないアプリケーションの一部があり、RDDにドロップする必要があります。その本質に剥奪、コードは次のようになります。Spark DataFrameによるパーティショニングを確実に行うにはどうすればいいですか?
C = A.join(B, join_key) # join_key is a string naming a column
D = C.rdd.mapPartitions(do_something)
を正しく動作させるために、do_something
はC.rdd
がjoin_key
で仕切られている必要があります。私はだと思います。これは、equijoinsがキーでデータを分割し、キー値が同じペアを形成することで機能します。 SparkのRDD結合では、パーティションデータに対するイテレータによって暗黙的にペアが形成され、Sparkにイテレータを「マテリアライズ」しない限り、ペアが定義されたパーティションから離れることはありません私はここでやっていない結果を再分割します。私は、DataFrameの結合についても同じことが予想されます。
上記の説明では、が見つかりません希望のパーティションが保証されています。私はSpark実装の詳細についてはAPIを介して保証されていないので、100%安全であるかどうかはわかりません。 Catalystオプティマイザが、同じキーを共有するペアのグループに追加のパーティション境界を投げたり、分割したり、アルゴリズムを誤ってしまうことはないという保証はありません。
do_something
関数を適用する前に、私は明示的にC.rdd.partitionBy(lambda x: x['join_key'])
を実行することができますが、これは不要なシリアライズやシャッフルなどのオーバーヘッドを引き起こす可能性があると心配しています。
this blog postによると、HiveQLのDISTRIBUTE BY
も使用できるようですが、やはりこのトリガーがどのようなオーバーヘッドになるかわかりません。
私の質問です:結合によって引き起こされる暗黙のパーティショニングに頼っても安全ですか、それとも明示的に保証する必要がありますか?もしそうなら、それを保証する最も効率的な方法は何ですか?私はPySpark 1.6.2で作業しています。
非常に有益な答えです。 1つのフォローアップ:もしあなたが推測しなければならないなら、もっと早くなると思いますか? 'rdd。(key).mapPartitions(do_something) 'または' data_frame.partitionBy(key).rdd.mapPartitions(do_something) 'のどちらかです。別の言い方をすれば、DataFramesにパーティショニングを単純に処理させ、残りの部分をRDDで処理させることによるメリットはありますか?彼らは基本的に同じことをやっているが、DataFrameはserdeと安全でない変換オーバーヘッドを追加しているので、答えはおそらくいいえのように聞こえるでしょうか? – Paul
私はそれが役に立つとうれしいです。興味深い質問です。 RDD vs DataFrameに関して私はこれをベンチマークしようとしていないので、決定的な答えは得られませんが、おそらく 'DataFrame'を使うでしょう。これは特殊なメカニズムであるため、今ではなくてもさらに最適化することができます。 'df.partitionBy'は' DataFrame'を分割しないことを覚えておいてください。 DFを分割するには、答えにあるように 'repartition'を使うべきです。 – zero323