2016-12-09 6 views
1

こんにちは、stackoverflowコミュニティ。スパーク:DAGを理解して変換を強制する

私の考えが正しいか、私のSparkの仕事でいくつかの点が欠けているかどうかを理解する上で助けを求めます。

私は現在、減算したい2つのrddを持っています。 両方のrddは、同じ父親のRDD上で異なる変換として構築されています。それが得られた後

まず、父RDDがキャッシュされている:

val fatherRdd = grandFather.repartition(n).mapPartitions(mapping).cache 

は次に、2つのRDDSが変換されます。

son1= rddFather.filter(filtering_logic).map(take_only_key).distinct 

他方は:

son1.subtract(son2) 

Iが期待:

son2= rddFather.filter(filtering_logic2).map(take_only_key).distinct 

二人の息子は、次にSON1にのみ鍵を得るために減算され 一つは、(擬似コード)であります変換の範囲は次のようになります。

  1. mapPartitions
  2. 配分
  3. キャッシング

そして、両方RDDSに異なる、次いで減算キャッシュされたデータ、マップ・フィルタ・マップから始まります。

これは起こっていませんが、2つのdistinctオペレーションが並行して実行されていることがわかります。キャッシュの利点を利用していない(スキップされたタスクはありません)とほぼ同じ計算時間です。 spark uiから撮影したダグの画像の下。 enter image description here

私には何か提案がありますか?

答えて

1

あなたの所見は正しいです。 RDDの変換は怠惰なので、RDDが実際に計算された後にキャッシングが行われます。

親RDDでアクションを呼び出す場合は、計算してキャッシュする必要があります。その後の操作は、キャッシュされたデータに対して行われます。

+1

ありがとうございます。私はおそらく、次のような文書の部分を見逃していました: "RDDにpersist()メソッドまたはcache()メソッドを使用して永続化することをマークできます。アクションで最初に計算されると、ノード上のメモリに保持されます" 私は 'cache'が効果的にすべてをキャッシュするためのアクションを実行すると考えていたので。 私のスパークジョブは、はるかに高速で、リソース割り当ての面でも優れています。 –

+0

PS。私はcount()を追加しました。 –

関連する問題