シングルトン変数にThreadLocalを使用すると、Spark Dataframe処理フレームワークでスレッドセーフなものになりますか? Breeze fourierTr関数はThreadLocalを使用しており、問題が発生しているようです。Spark Dataframe処理ではScala Breezeパッケージのスレッドは安全ですか?
私は多次元テーブルをアセンブルし、さまざまな次元でFFTを計算するためのアプリケーションを構築しています。
val r = df.rdd.flatMap{ row =>
// scrub the input, format data into coordinates with a value
// create a key corresponding to a slice through the data
// that will get processed in the next step
}
.groupByKey.flatMap{ case(sliceKey, coordinateList) =>
// note the vector length is variable
val buf = new Array[Complex](lengthOfVector)
// fill buffer with values from data structure slice
fourierTr(new DenseVector(buf))
}
注:これは疑似コードです。私は簡潔な例を作るために実際のコードの多くを取り除いた。
重要な点は、fourierTrの呼び出しです。私はこれを私のローカル開発マシンで走らせたところ、すべてうまくいきました。私は期待した結果を得ました。 。(つまり、使用してまず
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 36, localhost): java.lang.ArrayIndexOutOfBoundsException: 12
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.passfg(DoubleFFT_1D.java:3843)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.cfftf(DoubleFFT_1D.java:3390)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:189)
at edu.emory.mathcs.jtransforms.fft.DoubleFFT_1D.complexForward(DoubleFFT_1D.java:161)
at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:69)
at breeze.signal.fourierTr$$anon$5.apply(fourierTr.scala:62)
at breeze.generic.UFunc$class.apply(UFunc.scala:48)
at breeze.signal.fourierTr$.apply(fourierTr.scala:25)
私はこれが原因私の開発マシンとクラスタ間のパッケージのバージョンの違いであってよいと思った。私は大きな、マルチコアマシンに移動する場合しかし、私は次の例外が発生しましたAWS)。関連するすべてのjarバージョンが一致していることを確認した後も、私は同じ問題を抱えていました。それから私は、アプリケーションが、私は
spark-submit --master local[1] ...
でそれを起動した場合、私は
spark-submit --master local[2] ...
または任意のノード数2よりも高いとそれを起動した場合しかし、その後、私は例外になるだろう細かい走ったことが決定しました。私はいくつかの記憶が何らかの形で崩壊していると思わせた。そこで、私は図書館の資料を探し始めました。
エントリーポイントは、私はそれが共有変数fft_instD1Dを修正していること...
object JTransformsSupport {
//maintain instance of transform to eliminate repeated initialization
private val fft_instD1D = new ThreadLocal[(Int, DoubleFFT_1D)]
def getD1DInstance(length: Int): DoubleFFT_1D = {
if (fft_instD1D.get != null && length == fft_instD1D.get._1) fft_instD1D.get._2
else {
fft_instD1D.set((length, new DoubleFFT_1D(length)))
fft_instD1D.get()._2
}
}
注意を見つけるfourierTr.scalaJTransformsSupport.scalaでgetD1DInstanceに押し込み
implicit val dvComplex1DFFT : fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] = {
new fourierTr.Impl[DenseVector[Complex], DenseVector[Complex]] {
def apply(v: DenseVector[Complex]) = {
//reformat for input: note difference in format for input to real fft
val tempArr = denseVectorCToTemp(v)
//actual action
val fft_instance = getD1DInstance(v.length)
fft_instance.complexForward(tempArr) //does operation in place
//reformat for output
tempToDenseVector(tempArr)
}
}
}
です。私は、ThreadLocal型に慣れていませんが、これはクラスのスレッドを安全にするためのものです。しかし、DoubleFFT_1Dオブジェクトをスタック変数としてインスタンス化するようにコードを変更した後、すべての低レベルルーチンを直接呼び出しました(たとえば、私はDoubleFFT_1D.complexForwardというfourierTrを呼び出すのではなく)。
この変更を行った後、Sparkで使用されるノードの数に関係なく例外は発生しなくなりました。だから、フーリエ変換ライブラリによるThreadLocalの使用が原因だと思われます。
Scala/Breeze/Sparkの専門家が私の結論に同意したと思う人がいますか?
正しくない場合は、Spark Dataframe処理のコンテキストでBreeze(具体的にはfourierTr)を正しく使用する方法を提案してください。
それが正しい結論であれば、私はいくつかの質問をフォローアップしてい...
- はそのブリーズ機能は、データフレーム処理パイプライン 内から呼び出すことができますと仮定し、私は間違っていましたか?BreezeがDataframe処理パイプラインから呼び出されることが意図されていない場合、パイプラインから呼び出すことができるようにライブラリをラップする標準的な方法がありますか、一般に行ったことを実行してライブラリの一部共有変数を排除する機能?
- データフレームパイプラインからBreezeを呼び出す予定の場合、 はBreezeライブラリのバグ、またはThreadLocalクラスの 実装のバグのようですか? ... ie。その注意を 私はこれをもたらす必要がありますか?
すごく頼んでくれてありがとう。私はこれをBreeze:github.com/scalanlp/breeze/issuesで提案します。 –