2017-03-24 7 views
1

pysparkスクリプトを実行していて、以下のエラーが発生しました。 "RuntimeError:私のコード" if len(rdd.take(1))> 0: "のために、反復中にサイズが変更されました。私はそれが本当の理由で、正確に何が間違っていたのか疑問に思っていません。どんな助けでも大歓迎です。PySpark RuntimeError:反復中にサイズが変更されました

ありがとうございました!

17/03/23 21:54:17 INFO DStreamGraph: Updated checkpoint data for time 1490320070000 ms 
17/03/23 21:54:17 INFO JobScheduler: Finished job streaming job 1490320072000 ms.0 from job set of time 1490320072000 ms 
17/03/23 21:54:17 INFO JobScheduler: Starting job streaming job 1490320072000 ms.1 from job set of time 1490320072000 ms 
17/03/23 21:54:17 ERROR JobScheduler: Error running job streaming job 1490320072000 ms.0 
org.apache.spark.SparkException: An exception was raised by Python: 
Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 

line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration

at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) 
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) 
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) 
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Traceback (most recent call last): 
    File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", 

line 224, in ssc.awaitTermination(); File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 206, in awaitTermination File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o38.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call r = self.func(t, *rdds) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in func = lambda t, rdd: old_func(rdd) File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py", line 96, in _compute_glb_max if len(rdd.take(1)) > 0: File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 965, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2439, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2372, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _prepare_for_python_RDD broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError: Set changed size during iteration

at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) 
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) 
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) 
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

コードのその部分を "rdd.isEmpty():if not"に変更し、同じエラーが発生しました。 – rxie

+0

'RuntimeError:反復処理中に変更されたサイズを設定する 'とは、' set'型のオブジェクトを扱っていることを意味し、操作している間にサイズが変更されています。セットが不変なので、これはエラーをスローします。うまくいけば、あなたはこの似たような質問を一般化することができますhttp://stackoverflow.com/questions/22846719/recursion-how-to-avoid-python-set-changed-set-during-iteration-runtimeerror – ralston

+0

あなたの提案をありがとう! – rxie

答えて

1

反復の間にブロードキャスト変数を作成するのはベストプラクティスではないようです。ステートフルなデータが必要な場合は、常にupdateStateByKeyを使用してください。

関連する問題