私のスパークアプリケーションでは、ループ内でデータフレームの操作を行い、結果をhdfsに書きたいと思います。再帰的データフレーム操作
擬似コード:上記の例で
var df = emptyDataframe
for n = 1 to 200000{
someDf=read(n)
df = df.mergeWith(somedf)
}
df.writetohdfs
"mergeWithは" unionAllをするとき、私は良い結果を得ます。
しかし、 "mergeWith"で(単純な)ジョインを行うと、ジョブは実際には遅くなり(2コアのエグゼキュータで4時間ごとに1時間以上)、完了することはありません(ジョブは異常終了します)。
私のシナリオでは、〜1mbのテキストデータを含むファイルで〜50回の繰り返しをスローします。
私の場合は、マージの順序が重要なので、これはDAGの生成によるものであり、データを保存した時点ですべてが実行されている可能性があります。
今、私はマージされたデータフレームで.persistを使用しようとしていますが、それはやや遅くなるようです。
EDIT:
ジョブがメモリ内のデータフレームは、静的なデータフレームのようには見えませんでした(私は数と.persistをやっていても)私が気づい走っていたとして。 これは実行されていたすべてのマージに対して、文字列で結ばれたパスのように見え、効果的にジョブを直線的に減速させます。
私はvar df
がこれの原因であると思われるのですか?問題の
内訳は、私はそれを見るように:
dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....
私はDF」ACおよびDは、対象となる違っを刺激し、私が続く場合は気にしませんか期待した場合再分配かどうか。し、RDDと再びDFを変換するときにDFの上で動作していない持続私は「休憩」系譜可能性を取り除くために
dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....
アップデート2
:それはこのようになりますスパークする 。 これは少しのオーバーヘッドがありますが、許容できるものです(ジョブは数時間ではなく数分で完了します)。 永続化に関するいくつかのテストを実行し、回避策の形で回答を策定します。
結果: これはこれらの問題を表面上で解決するように見えます。実際に私は戻って振り出しにあるよ、あなたはこのようなコードがある場合はOOM例外にjava.lang.OutOfMemoryError: GC overhead limit exceeded
この「mergeWith」関数が何をすべきかはわかりません(あなたは共用と結合の両方を書く)。あなたは 'mergeWith'のコードを含めることができますか? –
'mergeWith'関数は多くのことができます。それは' union all'のときだけです。良い結果が得られます。 またはそれ以下のようなものになる可能性があります。 'SELECT f。* FROM full f LEFT OUTER JOIN delta i CONCAT(fa、fb)= CONCAT(ia、iz)CONCAT(ia、iz)がNULLの場合UNION ALL SELECT d。* FROM delta d' – Havnar
「共用」と「結合」の間には大きな違いがあります。 'union'の場合、Sparkは追加されたデータを書くだけでよく、' join'はデータをシャッフルする必要があります。もちろん、あなたのデータのサイズに応じて、「結合」を実行すると、OOM例外が簡単に発生する可能性があります。特に、小さなクラスタで実行しているためです。 –