2017-08-17 1 views
1

のためのかなりの時間を要し、比較的小さなシャッフルデータサイズのための時間を読んで(タスクあたり19メガバイト程度)スパークシャッフル読み取りは、私たちは、次の段階DAGを実行していると長いシャッフルを経験している小さなデータ

enter image description here

一つの興味深い点は、各エグゼキュータ/サーバ内の待機中のタスクには、同等のシャッフル読み取り時間があります。次のサーバーでは、1つのグループのタスクは約7.7分待機し、もう1つのタスクは約26秒待機します。ここ

enter image description here

同じステージの実行から別の例です。この図は、シャッフル読み込み時間が等しいタスクの一様なグループを持つ3人のエグゼキュータ/サーバを示しています。

enter image description here

はない、すべての執行は、そのようなものです:青のグループが原因投機的実行に殺されたタスクを表します。数秒ですべてのタスクをほぼ一貫して完了させるものもあります。これらのタスクのリモート読み取りデータのサイズは、他のサーバーで長時間待っているものと同じです。 さらに、このタイプのステージは、アプリケーションランタイム内で2回実行されます。大きなシャッフル読み取り時間でこれらのタスクグループを生成するサーバー/エグゼキュータは、各ステージの実行ごとに異なります。

enter image description hereそれはこのDAGの責任のコードのように見える

は以下の通りです:ここで

は切断/ホストのいずれかのタスクの統計情報テーブルの一例である

output.write.parquet("output.parquet") 
comparison.write.parquet("comparison.parquet") 
output.union(comparison).write.parquet("output_comparison.parquet") 
val comparison = data.union(output).except(data.intersect(output)).cache() 
comparison.filter(_.abc != "M").count() 

我々これについてあなたの考えを高く評価します。

+1

Strange。コードとデータのサンプルが評価されます。私はそのDAGのすべてのステップがキャッシュ呼び出しを持っているのを見て、あなたはすべてをキャッシュしていますか? – Garren

+0

こんにちは。あなたの質問をありがとう。上記のコードにコードを掲載しました。私たちはそれが必要と思うときにだけキャッシュしています。 – Dimon

+0

exceptとintersectの呼び出しが私のレーダーに懸念されています。 DAGはsortmergejoinを参照します。どのラインがトラブルを引き起こしているのか、あなたはすでに知っていますか? – Garren

答えて

0

明らかに問題はJVMガベージコレクション(GC)でした。タスクは、リモートエグゼキュータでGCが実行されるまで待つ必要がありました。同等のシャッフル読み取り時間は、いくつかのタスクがGCを実行する単一のリモートホスト上で待機していたことに起因します。私たちは助言に続いてhereと掲示され、問題は大きさの順序で減少した。リモートホストのGC時間とローカルシャッフル読み取り時間との間にはまだ小さな相関関係があります。将来はシャッフルサービスを試してみることにします。

関連する問題