2017-06-01 8 views
0

キャッシュされた結果をリストに保存し、それぞれの最後のリストのすべてのデータで新しいDataFrameを計算しようとすると問題が発生しました繰り返し。しかし、空のDataFrameを使用して毎回空の結果を取得しても、関数は約8〜12ラウンド後に突然非常に遅くなります。ここでSpark Dataframeが突然古いキャッシュデータを繰り返し過度に再利用すると、急激に遅くなる

は私のコード

testLoop(Nil) 
def testLoop(lastDfList:List[DataFrame]){  
    // do some dummy transformation like union and cache the result 
    val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache   

    // always get 0, of course 
    println(resultDf.count) 

    // benchmark action 
    benchmark(resultDf.count)  

    testLoop(resultDf::lastDfList) 
} 

ベンチマーク結果 1~6 round : < 200ms 7 round : 367ms 8 round : 918ms 9 round : 2476ms 10 round : 7833ms 11 round : 24231ms

である私は、私はすでに空のデータフレームを使用しているためGCまたはブロックの立ち退きが私の場合は問題だと思うが、私ドンはありません何が原因なのか分からない?私はキャッシュや何かの意味を誤解していますか?

ありがとうございます!

spark.sparkContext.setCheckpointDir("/tmp") 

testLoop(Nil) 
def testLoop(lastDfList:List[DataFrame]){  
    // do some dummy transformation like union and cache the result 
    val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache   

    resultDf.checkpoint() 

    // always get 0, of course 
    println(resultDf.count) 

    // benchmark action 
    benchmark(resultDf.count)  

    testLoop(resultDf::lastDfList) 
} 

しかし、それはまだ数回の反復の後に非常に遅くなる:


ImDarrenGのソリューションを読んだ後、私は次のように私のコードを変更しました。ここで

答えて

1

あなたはlastDfListの先頭にresultDfを追加することにより、DataFramesのリストを作成し、testLoopの次の反復にそれを渡す:

testLoop(resultDf::lastDfList) 

のでlastDfListが長く、各パスを取得します。

この行はlastDfListの各メンバーをINGのunionによって新しいDataFrameを作成:

val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf))}.cache 

lastDfListの各メンバーが、それの前任者の組合であるため、スパークが各パスに対して指数関数的に大きくなる系統を維持していますtestLoopです。

私は、時間の増加がDAGのハウスキーピングによって発生すると予想しています。データフレームをキャッシュすることで、変換を繰り返す必要はなくなりますが、スパークによってその系統を維持する必要があります。

キャッシュされたデータでもなくても、DataFrameをすべて前のものと結合して、実際に複雑なDAGを構築しているようです。各パスはtestLoopです。

checkpointを使用して系列を整理し、無限再帰を防ぐためのチェックを導入することができます。

+0

ありがとうございます!あなたは私に正しい方向を与えます。しかし、私はチェックポイントディレクトリを '' 'spark.sparkContext.setCheckpointDir("/tmp ");' 'で設定し、' '' val resultDf = lastDfList.foldLeft(...)の直後にresultDf.checkpoint() {...}。キャッシュ '' '。問題はまだそこにあり、約8回反復した後にはゆっくりと耐えられなくなります。私は自分のディレクトリ/ tmp上のチェックポイントデータを見ることができますが、彼らは助けに見えませんでした。 –

+0

'toDebugString'の出力を見て、非常に複雑なDAGを作成しています。コードの論理的な複雑さを考えると、遅くなることが予想されます。 – ImDarrenG

+0

私の問題は、DAGが反復回数が多すぎると複雑すぎることですが、APIドキュメントではチェックポイントを使用してデータセットの論理プランを切り捨てることができるため、状況に役立つと思います。しかし、それはありません:(私はdebugStringからの情報を見て、各反復後にDAGがまだ大きくなっていることを説明しています。 –

0

APIおよびcodeによれば、checkpointは元のデータセットを変更する代わりに新しいデータセットを返します。

+1

このリンクは質問に答えるかもしれませんが、答えの本質的な部分をここに含めて参考にしてください。リンクされたページが変更された場合、リンクのみの回答は無効になります。 - [レビューの投稿](/レビュー/低品質の投稿/ 18342577) –

関連する問題