キャッシュされた結果をリストに保存し、それぞれの最後のリストのすべてのデータで新しいDataFrameを計算しようとすると問題が発生しました繰り返し。しかし、空のDataFrameを使用して毎回空の結果を取得しても、関数は約8〜12ラウンド後に突然非常に遅くなります。ここでSpark Dataframeが突然古いキャッシュデータを繰り返し過度に再利用すると、急激に遅くなる
は私のコード
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
ベンチマーク結果 1~6 round : < 200ms 7 round : 367ms 8 round : 918ms 9 round : 2476ms 10 round : 7833ms 11 round : 24231ms
である私は、私はすでに空のデータフレームを使用しているためGCまたはブロックの立ち退きが私の場合は問題だと思うが、私ドンはありません何が原因なのか分からない?私はキャッシュや何かの意味を誤解していますか?
ありがとうございます!
spark.sparkContext.setCheckpointDir("/tmp")
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
resultDf.checkpoint()
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
しかし、それはまだ数回の反復の後に非常に遅くなる:
ImDarrenGのソリューションを読んだ後、私は次のように私のコードを変更しました。ここで
ありがとうございます!あなたは私に正しい方向を与えます。しかし、私はチェックポイントディレクトリを '' 'spark.sparkContext.setCheckpointDir("/tmp ");' 'で設定し、' '' val resultDf = lastDfList.foldLeft(...)の直後にresultDf.checkpoint() {...}。キャッシュ '' '。問題はまだそこにあり、約8回反復した後にはゆっくりと耐えられなくなります。私は自分のディレクトリ/ tmp上のチェックポイントデータを見ることができますが、彼らは助けに見えませんでした。 –
'toDebugString'の出力を見て、非常に複雑なDAGを作成しています。コードの論理的な複雑さを考えると、遅くなることが予想されます。 – ImDarrenG
私の問題は、DAGが反復回数が多すぎると複雑すぎることですが、APIドキュメントではチェックポイントを使用してデータセットの論理プランを切り捨てることができるため、状況に役立つと思います。しかし、それはありません:(私はdebugStringからの情報を見て、各反復後にDAGがまだ大きくなっていることを説明しています。 –