こんにちは、stackoverflowコミュニティ。スパーク:DAGを理解して変換を強制する
私の考えが正しいか、私のSparkの仕事でいくつかの点が欠けているかどうかを理解する上で助けを求めます。
私は現在、減算したい2つのrddを持っています。 両方のrddは、同じ父親のRDD上で異なる変換として構築されています。それが得られた後
まず、父RDDがキャッシュされている:
val fatherRdd = grandFather.repartition(n).mapPartitions(mapping).cache
は次に、2つのRDDSが変換されます。
son1= rddFather.filter(filtering_logic).map(take_only_key).distinct
他方は:
son1.subtract(son2)
Iが期待:
son2= rddFather.filter(filtering_logic2).map(take_only_key).distinct
二人の息子は、次にSON1にのみ鍵を得るために減算され 一つは、(擬似コード)であります変換の範囲は次のようになります。
- mapPartitions
- 配分
- キャッシング
そして、両方RDDSに異なる、次いで減算キャッシュされたデータ、マップ・フィルタ・マップから始まります。
これは起こっていませんが、2つのdistinct
オペレーションが並行して実行されていることがわかります。キャッシュの利点を利用していない(スキップされたタスクはありません)とほぼ同じ計算時間です。 spark uiから撮影したダグの画像の下。
私には何か提案がありますか?
ありがとうございます。私はおそらく、次のような文書の部分を見逃していました: "RDDにpersist()メソッドまたはcache()メソッドを使用して永続化することをマークできます。アクションで最初に計算されると、ノード上のメモリに保持されます" 私は 'cache'が効果的にすべてをキャッシュするためのアクションを実行すると考えていたので。 私のスパークジョブは、はるかに高速で、リソース割り当ての面でも優れています。 –
PS。私はcount()を追加しました。 –