2017-11-09 8 views
3

は、ここに私のデータフレームである(2.2をスパーク):dataset.countがシャッフルを引き起こすのはなぜですか?

enter image description here

根本的なRDDは、2つのパーティション

enter image description here enter image description here

私はdf.countを行う

を持って、生産DAGは です enter image description here

df.rdd.countを実行すると、DAGは次のようになります。

enter image description here

QUES:Countがスパークでのアクションで、公式の定義は「データフレーム内の行数を返します。」。さて、私がデータフレーム上でカウントを実行すると、なぜシャッフルが起こっていますか?また、基本的なRDDで同じことをしてもシャッフルは起こりません。

なぜシャッフルが起こるのかはわかりません。私はここでカウントのソースコードを調べようとしましたspark githubしかし、それは完全に私に意味をなさない。 "groupby"はアクションの犯人に供給されていますか?

PS。 df.coalesce(1).countはシャッフルを起こさない

答えて

2

sparkがデータフレーム操作を行っているときは、最初にすべてのパーティションの部分カウントを計算し、次にそれらを合計する別のステージがあります。これは、大規模なデータフレームに特に適しています。ここでは、複数のエグゼキュータへのカウントの分配が実際にパフォーマンスに追加されます。これを確認する

場所は、以下の物理的な計画の記述のいくつかの並べ替えを持っているでしょうスパークUIのSQLタブです:

*HashAggregate(keys=[], functions=[count(1)], output=[count#202L]) 
+- Exchange SinglePartition 
    +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#206L]) 
+0

いくつかの意味があります。 rdd.countの場合はどうなりますか? rddに2つのパーティションがあるとします。 – bigdatamann

+0

これらのRDDパーティションは、操作の瞬間に同じエグゼキュータ上にある可能性が最も高いです。私は、DFのメカニックを重視していましたが、RDDの詳細はわかりません。 RDDの実用的なことの1つは、データフレームを変換する必要がある場合や、あまり構造化されていないソースからデータフレームを作成する必要がある場合にのみ使用することです。構造化されたデータの処理では、データフレームの方が一般に高速です。 –

+0

私は3つのパーティション化されたdfでテストを実行し、個々のパーティションカウントが1つのステージで計算されていることを確認してから、3つのパーティションを書き込むシャッフルが発生しました。次の段階では、これらの3つのパーティションを読み取り、それらを合計します。しかし、このようなことは意味をなさない。なぜなら、シャッフルは狭い依存変換+アクションのように見えるからだ。 – bigdatamann

1

データフレームのカウント動作がシャッフルで結果GROUPBYを使用しているようです。あなたはRDDのカウント機能を見れば、それは配列として各パーティションの合計を返し、その後、合計に.SUM使用するか、各パーティションに集約関数に渡されますが、以下のhttps://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

* Returns the number of rows in the Dataset. 
* @group action 
* @since 1.6.0 
*/ 
def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan => 
plan.executeCollect().head.getLong(0) 
} 

からのコードです配列の要素このリンクから

コードスニペット: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

/** 
* Return the number of elements in the RDD. 
*/ 
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum 
+0

ありがとうPratyush。いくつかの質問: 1. "groupBy()。count()。queryExecution"はどのくらい正確に動作しますか? groupbyとcountの両方がメソッドなので、 2.「sc.runJob(this、Utils.getIteratorSize _)」の下線はどういう意味ですか? – bigdatamann

関連する問題