現在11,000ファイルを処理中です。各ファイルは、前のものとの連合となるデータフレームを生成します。以下はコードです大きなデータフレームをスパークで効果的にキャッシュする
var df1 = sc.parallelize(Array(("temp",100))).toDF("key","value").withColumn("Filename", lit("Temp"))
files.foreach(filename => {
val a = filename.getPath.toString()
val m = a.split("/")
val name = m(6)
println("FILENAME: " + name)
if (name == "_SUCCESS") {
println("Cannot Process '_SUCCSS' Filename")
} else {
val freqs=doSomething(a).toDF("key","value").withColumn("Filename", lit(name))
df1=df1.unionAll(freqs)
}
})
最初に、私は11000ファイルにjava.lang.StackOverFlowError
のエラーがあります。その後、私はdf1=df1.unionAll(freqs)
後に次の行を追加します。
df1=df1.cache()
それは問題を解決しますが、各反復の後に、それが遅くなっています。誰かが時間の短縮なしでStackOverflowError
を避けるために何をすべきか教えてください。 ありがとう!
系統が非常に深くなって非効率的になる。あなたは、(スパーク2の)系統を切り詰めるために「チェックポイント」を試みることができます。別の方法として、すべての個別のデータフレームをディスクに書き込むこともできます(ファイル名でパーティション化されたテーブルなど)。 –