2017-01-23 9 views
0

私たちのクラスタでは、Sparkをスタンドアロンモードで実行しています(100台のマシン、16台のCPUコア、マシンあたり32GBのRAM)。 アプリケーションの実行中にSPARK_WORKER_MEMORYおよびSPARK_WORKER_CORESを指定します。Sparkはどのマシンでどのようにマルチコアの並列処理を利用していますか?

Sparkプログラミングでは、あたかもそれがシリアルプログラムであるかのようにプログラムして、Sparkフレームワークがタスクを自動的に並列化します。

SPARK_WORKER_CORES = 16でプログラムを実行したときにOOMクラッシュが発生しました。プログラムが正常に完了したSPARK_WORKER_CORES = 4で再試行しました。

データ並列処理で複数のスレッドを利用するには、より大きなメモリが必要ですが、Sparkプログラムのどの機能が複数のスレッドによって並列化されているのかわかりません。だから私はどの機能がOOMを担当しているのか分かりません。

データの各RDDパーティションがメモリに収まるように、マシンの総数と1人のマシン(マシン)ごとのメモリの量を考慮して、RDDパーティションの数(並列度)を制御します。

RDDを分割した後、マシン内のワーカーは、各RDDパーティションのユーザー定義関数を呼び出して処理します。

ここで私はSparkがどのマシンでどのようにマルチコアの並列処理を利用するのかという質問がありますか?

複数のスレッドによってどの機能が並列化されていますか? あまりにも多くのメモリを使用しないように特別な注意を払う必要はありますか?

おかげ

+0

私は何かをする機能はすべて並列化されていると思います。より多くの情報を得ることなく「どの機能」を言うのは本当に難しいです - あなたはスカラを使っていますか? Python? Java? Sparkは遅れて評価するので、count()、collect()などの何かを "行う"ようになるまで、何も配布されません。それは役に立ちますか? – flyingmeatball

+0

@flyingmeatballプログラムはRDDのforeachPartition操作を使用します。 Sparkはどのように各RDDパーティションの計算を並列化しますか? – syko

+0

.map、.flatmap、.reduceなどのより標準的な構文ではなく、foreachPartitionを使用している特別な理由はありますか? (ここを参照:http://stackoverflow.com/questions/25914789/how-do-iterate-rdds-in-apache-spark-scala)。メモリが不足している場合は、コアあたりのコア*メモリ+使用可能な他のメモリ>使用可能なメモリを意味します。これは、既に解決策が見つかったようです... Executorの数をダイヤルします。エグゼクティブ。 – flyingmeatball

答えて

0

スパーク(RDDがスピッティングおよびクラスタ全体に分散された)すべてのパーティション上のあなたのロジックを実行します。各エグゼキュータには専用のコア数とメモリが事前に定義されています。リソースエグゼキュータは、ドライバによって送信されたタスクを実行するために使用されるタスクスロットを提供します。最良の状態では、実行スロットで実行可能なタスクスロットがあれば、それはそのスロットを予約し、そうでなければ同じノード上の他の実行プログラムのタスクスロットを使用し、利用可能なタスクスロットがなく、クラスタ(ラック)ネットワークを介したレベル転送。一般にOOMは、ドライバをtoArray()、RDDのすべてのパーティションを1つのノードにまとめるcollect()などのように、すべての日付を1か所に集めるときに発生します。一方、パーティションの処理段階で、エグゼキュータ・メモリーとエグゼキュータ・メモリーのオーバーヘッドがコンテナー全体のメモリーを超えた場合は、エグゼキューターで発生する可能性があります。

関連する問題