5

ZeppelinでSpark bisecting kmmeansアルゴリズムを実行しています。私はいつもかかわらず、このエラーが出るSpark throws java.util.NoSuchElementException:キーが見つかりません:67

//I transform my data using the TF-IDF algorithm 

val idf = new IDF(minFreq).fit(data) 
val hashIDF_features = idf.transform(dbTF)  

//and parse the transformed data to the clustering algorithm. 

val bkm = new BisectingKMeans().setK(100).setMaxIterations(2) 
val model = bkm.run(hashIDF_features) 
val cluster_rdd = model.predict(hashIDF_features) 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 270.0 failed 4 times, most recent failure: Lost task 0.3 in stage 270.0 (TID 126885, IP): java.util.NoSuchElementException: key not found: 67 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:58) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) 
    at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) 
    at scala.collection.immutable.List.reduceLeft(List.scala:84) 
    at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) 
    at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:933) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:261) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$run$1.apply$mcVI$sp(BisectingKMeans.scala:194) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:189) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101) 

イムスパーク1.6.1を使用します。 興味深いことに、このアルゴリズムをスタンドアロンアプリケーションで実行するとエラーは発生しませんが、私はこれをZeppelinで取得します。それに加えて、入力は外部アルゴリズムによって計算されているので、書式設定の問題だとは思わない。何か案は?

編集:システムを少量のクラスタで再度テストしましたが、エラーは発生しません。大規模なクラスタ値に対してアルゴリズムが壊れるのはなぜですか?

答えて

1

私はこの問題がclosureであると考えています。アプリケーションをローカルで実行すると、すべてが同じメモリ/プロセスで実行されている可能性があります。他のメモリ/プロセスで実行されているかもしれないclousreからローカル変数にアクセスしようとしていないことを確認してください。 Thisが問題を解決するのに役立ちます。

+1

私は関数を閉じて確実に閉じても問題は解決しません。 – Mnemosyne

1

私も同じ問題に直面しています。私はこの問題をSpark JIRAに報告しましたが、何の反応もありませんでした。 https://issues.apache.org/jira/browse/SPARK-16473

+0

誰にもこの問題の解決策がありますか?またはそれの根本的な原因 –

+0

私はどのように正確にわからないが、問題はメモリ関連であると思う。私はより強力なクラスターに移動し、上限が増加しました。私は150-200クラスターに行くことができたし、私は再び同じエラーを打つだろう。おそらくアルゴリズムのバグです。 BKMはKMよりもずっと速いので、残念です。 – Mnemosyne

+0

誰かが、BisectingKmeans.scalaクラスのこれまでの変更をいつsbtリポジトリに反映させるのかを知っていますか?私は更新された関数が必要ですが、最後のsbtバージョンのmllib(2.11)では変更がありません。 – eifersucht

関連する問題