2016-11-22 30 views
2

私のスパークアプリケーションでは、ループ内でデータフレームの操作を行い、結果をhdfsに書きたいと思います。再帰的データフレーム操作

擬似コード:上記の例で

var df = emptyDataframe 
for n = 1 to 200000{ 
    someDf=read(n) 
    df = df.mergeWith(somedf) 
} 
df.writetohdfs 

"mergeWithは" unionAllをするとき、私は良い結果を得ます。

しかし、 "mergeWith"で(単純な)ジョインを行うと、ジョブは実際には遅くなり(2コアのエグゼキュータで4時間ごとに1時間以上)、完了することはありません(ジョブは異常終了します)。

私のシナリオでは、〜1mbのテキストデータを含むファイルで〜50回の繰り返しをスローします。

私の場合は、マージの順序が重要なので、これはDAGの生成によるものであり、データを保存した時点ですべてが実行されている可能性があります。

今、私はマージされたデータフレームで.persistを使用しようとしていますが、それはやや遅くなるようです。

EDIT:

ジョブがメモリ内のデータフレームは、静的なデータフレームのようには見えませんでした(私は数と.persistをやっていても)私が気づい走っていたとして。 これは実行されていたすべてのマージに対して、文字列で結ばれたパスのように見え、効果的にジョブを直線的に減速させます。

私はvar dfがこれの原因であると思われるのですか?問題の

spiraling out of controle

内訳は、私はそれを見るように:

dfA = empty 
dfC = dfA.increment(dfB) 
dfD = dfC.increment(dfN).... 

私はDF」ACおよびDは、対象となる違っを刺激し、私が続く場合は気にしませんか期待した場合再分配かどうか。し、RDDと再びDFを変換するときにDFの上で動作していない持続私は「休憩」系譜可能性を取り除くために

dfA = empty 
dfC = dfA incremented with df B 
dfD = ((dfA incremented with df B) incremented with dfN).... 

アップデート2

:それはこのようになりますスパークする 。 これは少しのオーバーヘッドがありますが、許容できるものです(ジョブは数時間ではなく数分で完了します)。 永続化に関するいくつかのテストを実行し、回避策の形で回答を策定します。

結果: これはこれらの問題を表面上で解決するように見えます。実際に私は戻って振り出しにあるよ、あなたはこのようなコードがある場合はOOM例外にjava.lang.OutOfMemoryError: GC overhead limit exceeded

+0

この「mergeWith」関数が何をすべきかはわかりません(あなたは共用と結合の両方を書く)。あなたは 'mergeWith'のコードを含めることができますか? –

+0

'mergeWith'関数は多くのことができます。それは' union all'のときだけです。良い結果が得られます。 またはそれ以下のようなものになる可能性があります。 'SELECT f。* FROM full f LEFT OUTER JOIN delta i CONCAT(fa、fb)= CONCAT(ia、iz)CONCAT(ia、iz)がNULLの場合UNION ALL SELECT d。* FROM delta d' – Havnar

+0

「共用」と「結合」の間には大きな違いがあります。 'union'の場合、Sparkは追加されたデータを書くだけでよく、' join'はデータをシャッフルする必要があります。もちろん、あなたのデータのサイズに応じて、「結合」を実行すると、OOM例外が簡単に発生する可能性があります。特に、小さなクラスタで実行しているためです。 –

答えて

0

次は、私が使い終わったものです。それは私のユースケースにとって十分に機能的ですが、機能し、永続する必要はありません。

非常に修正ではなく回避策です。

val mutableBufferArray = ArrayBuffer[DataFrame]() 
mutableBufferArray.append(hiveContext.emptyDataframe()) 

for loop { 

       val interm = mergeDataFrame(df, mutableBufferArray.last) 
       val intermSchema = interm.schema 
       val intermRDD = interm.rdd.repartition(8) 


       mutableBufferArray.append(hiveContext.createDataFrame(intermRDD, intermSchema)) 
       mutableBufferArray.remove(0) 

} 

これは、私は、コンプライアンスにタングステンを格闘方法です。 DFからRDDに戻って戻ると、タングステン全体が前面から後ろに向かってプロセスパイプを生成するのではなく、実際のオブジェクトで終わります。

私のコードでは、ディスクに書き込む前に数回反復します(50-150回の反復が最適です)。それでは、bufferArrayをクリアして新鮮なものからやり直します。

0

を得る:

var df = sc.parallelize(Seq(1)).toDF() 

for(i<- 1 to 200000) { 
    val df_add = sc.parallelize(Seq(i)).toDF() 
    df = df.unionAll(df_add) 
} 

が続いてDFがその後400000のパーティションを持って、あなたが持っているので、(次のアクションが非効率的になりますパーティションごとに1つのタスク)。

たとえば、パーティション数を減らすようにしてください。 (例えば、df.coalesce(200).write.saveAsTable(....)などを使用して)データフレームを永続化する前に200を設定します。

+0

私はすでに出力を合体しています。申し訳ありませんが、私はそれについて言及しませんでした。 ここで問題となるのは、データフレーム内のレコードを更新する場合です(ユニオンですべてを追加するだけではありません) したがって、実行時には、持続するときに速度が遅くなります(遅くならないように) 再パーティション化は役に立ちません。私が意味するものを明確にするために – Havnar

+0

;永続化するか 反復処理中に再分割する...すべて同じ結果(問題の画像を参照)が最後に合体し、問題をHDFSに書き出す必要がある場所に移動するだけです – Havnar