私のクラスに不変のマップがあります。ローカルモードでコードを実行すると問題はなく、マップのすべてのキーにアクセスできます。しかし、私がクラスターモードでコードを実行すると、ノードはマップ内でキーを見つけられないというエラーをスローします。マップをクラスタモードで使用してスパークする
これまで私が試したことは次のとおりです。
- 不変のマップをクラスタ経由でブロードキャストします。
broadcast = sc.broadcast(my_immutable_map)
ペアRDD
my_map_rdd = sc.parallelize(my_immutable_map.toSeq)
として-Parallelizeマップ私はログを調べると、私はキーが見つからない例外を参照してください。 次のように私のエラースタックトレースは次のとおりです。
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 4 times, most recent failure: Lost task 1.3 in stage 15.0 (TID 25, datanode1.big.com): java.util.NoSuchElementException: key not found: 905053199731
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at havelsan.CDRGenerator$.generate_random_target(CDRGenerator.scala:95)
at havelsan.CDRGenerator$$anonfun$main$2$$anonfun$6.apply(CDRGenerator.scala:167)
at havelsan.CDRGenerator$$anonfun$main$2$$anonfun$6.apply(CDRGenerator.scala:165)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
あなたは、してくださいスパークマップを配布する方法を説明し、どのようにいくつかのノードがこのマップ内のいくつかのキーを見つけることができない可能性があることはできますか? Btw私のスパークバージョンは1.6.0です
私は何が欠けていますか?
UPDATE
この部分は、ドライバーにマップを初期化するためのものです。
...
var pd = sc.textFile("hdfs://...")
my_immutable_map = pd.map(line => line.split(":")).map{ line => (line(0), line(1).split(","))}.collectAsMap
...
broadcast = sc.broadcast(my_immutable_map)
my_map_rdd = sc.parallelize(my_immutable_map.toSeq)
これはエラーが発生した部分です。
def my_func(key:String):String={
...
my_value = broadcast.value(key)
...
}
my_funcは次のようにマップ内で呼び出されます。
my_another_rdd.map{ line =>
val key = line.split(",")(0)
my_func(key)
}
スパークバージョン?? – banjara
私のスパークバージョンは1.6.0です –
もっとコードを入力してください。マップがかなり小さい場合、最初のアプローチは正しいものです。 –