2016-10-27 8 views
11

アプリケーション(Spark 2.0.1)では、この例外が頻繁にポップアップします。 これについて何も見つかりません。 原因は何でしょうか?Spark Listener EventLoggingListenerが例外をスローしました/ ConcurrentModificationException

16/10/27 11:18:24 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception 
java.util.ConcurrentModificationException 
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) 
    at java.util.ArrayList$Itr.next(ArrayList.java:851) 
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) 
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) 
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:590) 
    at scala.collection.AbstractTraversable.to(Traversable.scala:104) 
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294) 
    at scala.collection.AbstractTraversable.toList(Traversable.scala:104) 
    at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314) 
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) 
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) 
    at scala.Option.map(Option.scala:146) 
    at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291) 
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) 
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
    at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283) 
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145) 
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76) 
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137) 
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157) 
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) 
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) 

EDIT:一つより多くの情報、私たちのアプリケーションは、長時間実行され、潜在的に障害が発生した火花コンテキストから再開するために、我々は2つの「ジョブ」の間SparkBuilder.getOrCreate()メソッドを使用します。これはリスナーとうんざりすることができますか?

答えて

6

Spark 2.0.1SPARK-17816)の既知の問題です。Spark 2.0.2/2.1.0related pull request)で修正されます。

Spark 2.0.2/2.1.0を待たずに例外を取り除くには、latest, unstable spark versionbuild apache-spark manuallyをクローンします。

更新:彼らはSpark 2.0.2をリリースしました!

+2

アップグレード以外にいくつかの回避策がありますか? – rado

+0

私は見つけられませんでしたが、 '' Spark 2.0.2''のリリースで廃止されるべきです –

2

また、Spark 2.0.1にアップグレードして同じ例外が発生しました。私たちは、次のイディオム含むPythonコードのセクションまで原因を狭め:

a = spark_context.textFile('..') 
a = a.map(stuff) 
b = a.filter(stuff).map(stuff) 

を私はスパークでの変数の自己代入して、過去に問題があったが、問題を2.0.1にアップグレードした後、実際に持っています急激に変化し、ConcurrentModification例外が発生しました。

私たちのための修正点は、自己割り当てを行わないようにコードを変更するだけでした。

+0

「Spark 2.0.1」にアップグレードした後、同じエラーメッセージが表示され、自分のコードに変数の自己割り当てがありません。 –

+1

「自己割り当て」は問題ではありません。 'a = a.map(stuff)'と言うときは、変数 'a'に新しいDataFrameを古い' a'から割り当てませんが、古い参照は失われません。なぜなら、結果のDataFrameによって参照されるからです。依存グラフ私はあなたが本当に何か他のものを見ていると思います。 – vy32

関連する問題