2017-09-07 13 views
0

設定は次のとおりです。160 GB、48個のvcoresを持つ3ノードのYARNクラスタのSpark 2.1。 動的割り当てが有効になっています。 spark.executor.memory=6Gspark.executor.cores=6非持続呼び出しよりも遅く持続する

まず、私はハイブテーブルを読んでいます:受注(329メガバイト)と左外側をやったLineItems(1.43ギガバイト)と が参加します。 次に、結合された データセット(var line1 = joinedDf.filter("linenumber=1")var line2 = joinedDf.filter("l_linenumber=2")など)に基づいて7つの異なるフィルタ条件を適用します。 結合されたデータセットを複数回フィルタリングしているため、結合データセットが完全にメモリに収まるため、永続化(MEMORY_ONLY)が役立つと思いました。私が持続していることに気づい

  1. 、スパークアプリケーションは、(3.5分対3.3分)なしの存続よりも実行に時間がかかります。 persistを使用すると、DAGは、1つのステージがpersistに対して作成され、他の下流のジョブがpersistが完了するのを待っていることを示します。 これは、persistがブロック呼び出しであることを意味しますか?または、永続化ブロックが使用可能になると、他のジョブのステージが処理を開始しますか?

  2. 非持続的なケースでは、異なるジョブが同じデータを読み取るために異なるステージを作成しています。データはさまざまな段階で複数回読み込まれますが、これは依然として持続的な場合よりも速くなっています。

  3. データセットが大きくなると、実際には、executorが メモリ(Javaヒープ領域)を使い果たしてしまいます。永続することなく、Sparkの仕事はうまくいっています。私はここでいくつかの他の提案を見ました:スパークjava.lang.OutOfMemoryError: Java heap space。 私はエクステンションのコアを増減させようとしましたが、 をディスクのみで保存し、パーティションを増やしてストレージの比率を変更しましたが、エグゼキュータのメモリに関する問題は何も役に立ちません。誰かがそれはメモリの問題のうち、トラブルシューティングについては移動する方法を、より重要なのではない-持続し、より高速であるものを例に作品を、持続どのように言及することができれば

私はそれをお願い申し上げます。

答えて

1

私はtransformationsactionsの違いをスパークで読むことをお勧めします。私はこれを自分自身に複数回咬まれたことを認めなければならない。

「スパーク内のデータが遅れて評価されます。これは、本質的に「アクション」が実行されるまで何も起こらないことを意味します。 .filter()関数は変換であるため、変換パイプラインにセクションを追加する場合を除いて、コードがそのポイントに達すると実際には何も起こりません。 .persist()への呼び出しも同様に動作します。

.persist()コールのコードダウンストリームで同時にトリガーできる複数のアクションがある場合は、実際にはそれぞれのアクションのデータを別々に「永続化」してメモリを使い果たしている可能性が非常に高いですSpark UIがデータセットのキャッシュされた%をキャッシュしています(100%を超えるキャッシュがある場合はここで説明しています)。データセットが2つの別々の変換パイプラインに分岐するコード内にポイントを持ちます(例ではそれぞれ.filter()です)。.persist()は、データソースの複数の読み込みを防止したり、高価なtransforフォークの前のパイプライン。

多くの場合、.persist()コールの直後(データフォークの前)に単一のアクションをトリガして、後で実行されるアクション(永続的にキャッシュされる) )独立したデータ。

TL; DR:

があなたの.persist()joinedDF.count()を行いますが、あなたの.filter()秒前。

+0

@ Travis-Hegnerさん、ありがとうございました....私は、ブランチした後に実行されるアクションのために、キャッシュされた部分が100%以上になるのを見ていました...複数のアクションに分岐する前に、 – sfbay

+0

@ travis-hegnerによって提案された変更を加えた後、私はキャッシュが最大100%になることに気付きました。しかし、実行時間はまだ変わらないが、非永続性は依然として持続性よりも速い。なぜそれが起こる可能性がありますか? – sfbay

+0

@ sfbay、私は推測することができます。なぜなら、それはあなたの '.filter()'の後に何が起こるかによって完全に決まるからです。 '.count()'の後ろに '.pountist()'が続いて '.join()'を生成する「アクション」を効果的にシリアライズして、次の「アクション」を続けるからだと思います。 '.persist()'がなければ、each '.filter()'は重複した系統を持つ別個の枝ですが、後のすべての "アクション"を同時に実行することができます。 –

関連する問題