2016-05-29 9 views
5

私は200MM以上の価値があるフラットファイル約10枚のアプリケーションを持っています。ビジネスロジックは、すべてを順次結合することに関係します。スパークRDD - シャッフルを避ける - パーティション分割は膨大なファイルを処理するのに役立ちますか?

私の環境: 1マスター - 3つの(テストのために私は、各ノードに1GBのメモリを割り当てた)スレーブ

は、コードのほとんどちょうどそれぞれが

RDD1 = sc.textFile(file1).mapToPair(..) 

RDD2 = sc.textFile(file2).mapToPair(..) 

join = RDD1.join(RDD2).map(peopleObject) 

どれ提案の参加については、以下を行いますチューニング、再パーティション化、並列化など..?そうであれば、パーティション分割に適した数を提示するベストプラクティスですか?私たちは常に1 RDDに参加している場合は、現在の設定ジョブは時間以上かかり、私はほとんどすべてのファイルのシャッフルの書き込みを見ると

>は3ギガバイト

+0

ファイル:根拠は、Spark-UIから取られた以下の美しいPICに提示されていますか?いくつのパーティションがありますか? – marios

+0

No。それらはaws s3にあり、まだパーティションを作成していませんが、内部的にはデフォルトの並列処理を使用している可能性があります。 – sve

+0

RDD1.partitions.sizeを実行するか、「RDD1.toDebugString」を実行して、どのくらいのパーティションの数を確認できますか? – marios

答えて

2

である(例えばRDD1)他のすべてと、アイデアそのRDDを分割して永続化することです。ここで

は(簡単にはPythonやJavaに変換できる)のsudo-Scalaの実装です:

val rdd1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(200)).cache() 

ここまで、我々は200個のパーティションにハッシュされるようにRDD1ています。初めて評価されると、永続化されます(キャッシュされます)。

もう2つのrddを読み込み、それらに参加しましょう。

val rdd2 = sc.textFile(file2).mapToPair(..) 
val join1 = rdd1.join(rdd2).map(peopleObject) 
val rdd3 = sc.textFile(file3).mapToPair(..) 
val join2 = rdd1.join(rdd3).map(peopleObject) 

リマイングRDDでは、パーティションを分割したり、キャッシュしたりしないことに注意してください。

rdd1は既にハッシュされたパーティションであり、残りのすべてのジョインに同じパーティションが使用されます。したがって、rdd2とrdd3は、rdd1のキーが置かれているのと同じ場所にキーをシャッフルします。

私たちはパーティションを作っていないと仮定し、質問に示されたものと同じコードを使用します。参加するたびに、両方のrddがシャッフルされます。つまり、rdd1にN個の結合があると、非パーティションバージョンはrdd1をN回シャッフルします。分割されたアプローチは、rdd1を1回シャッフルします。

+0

最初のRDDをキャッシュすると何が得られますか? – axiom

+0

すべてのキーで自宅が見つかると、すべての参加が完了するまでそこにとどまります。 – marios

+0

'rdd1'は最初の結合が呼び出されると一度マテリアライズされます。今後キャッシュされる予定ですが、OPで指定されたコードに従って、後で使用されることはありません。結合が完了するまで、 'rdd1'は必要ありません。私はあなたが少し異なるユースケースを提示しているのを見ます。 OPが必要です。rdd1.join(rdd2).... join(rddN)IMO。あなたの答えに提示されたコードでは、キャッシュは間違いなく役に立ちます。 – axiom

3

実際には、大規模なデータセット(5,100G +それぞれ)を使用して、参加を開始する前に、一連の参加に関係するすべてのRDDを共同でパーティション化すると、

RDD1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(2048)) 

RDD2 = sc.textFile(file2).mapToPair(..).partitionBy(new HashPartitioner(2048)) 
. 
. 
. 
RDDN = sc.textFile(fileN).mapToPair(..).partitionBy(new HashPartitioner(2048)) 

//start joins 

RDD1.join(RDD2)...join(RDDN)


サイドノート: 私は参加ツリーとして参加(各RDDが一度使用)、この種のを参照してください。 HDFS上で保存

enter image description here

+0

/@ marioの下でスパークスタンドアロンクラスターを実行しています。簡単な説明をありがとう。実際には、両方のユースケースを順番にRDD1と他のRDDに参加させるだけでなく、RDDn1、RDDn2、およびRDD1との結果を結合します。提供された例を見ると、すべてのRDDを分割し、プライマリRDDをキャッシュすると、パフォーマンスが向上することがわかります。私にこの権利があるかどうか知らせてください。 – sve

+0

@SpringStarter上記のケースでは、必要のないものにスペースを浪費しているので、キャッシングは実際に傷つくでしょう。しかし、あなたが言及した他のユースケースでは、キャッシングが本当に助けになるでしょう。 – axiom

関連する問題