2016-11-02 8 views
0

私はPython 2.7をjupyterノートブックで実行しています。私は、次のpyspark RDDオブジェクトを持っている:私はfile_rdd.values().reduce(max)を実行するとPyspark reduceはいくつかの関数を受け取りますが、他の関数は受け取りません。

file_rdd.collect() 

[(u'Jane', 2), 
(u'Jane', 1), 
(u'Pete', 20), 
(u'Tyler', 3), 
(u'Duncan', 4), 
(u'Yuki', 5), 
(u'Duncan', 6), 
(u'Duncan', 4), 
(u'Duncan', 5)] 

は、私が20を取得します。私はfile_rdd.values().reduce(sum)を実行したときしかし、私はエラーを取得する:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-61-eb9dd2b679d4> in <module>() 
----> 1 file_rdd.values().reduce(sum) 

/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.pyc in reduce(self, f) 
    800    yield reduce(f, iterator, initial) 
    801 
--> 802   vals = self.mapPartitions(func).collect() 
    803   if vals: 
    804    return reduce(f, vals) 

/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.pyc in collect(self) 
    774   """ 
    775   with SCCallSiteSync(self.context) as css: 
--> 776    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    777   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    778 

/Users/zacharythomas/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/Users/zacharythomas/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 30.0 failed 1 times, most recent failure: Lost task 1.0 in stage 30.0 (TID 59, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.py", line 800, in func 
    yield reduce(f, iterator, initial) 
TypeError: 'int' object is not iterable 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:911) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/rdd.py", line 800, in func 
    yield reduce(f, iterator, initial) 
TypeError: 'int' object is not iterable 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

それはなぜですか? file_rdd.values().reduce(lambda x, y: x+y)を実行すると、希望の結果が得られます50

私には、sumは繰り返し可能なオブジェクトが必要であるのに対して、.reduceはバイナリ操作を必要としているからです。それでなぜmaxがうまくいくのか説明していないのでしょうか?

答えて

1

実際には、最後の段落から "誰かが" 非常に適切である;-)

reduceは二項演算を望んでいるとmaxは、そのうちの一つである:

>>> max(1,2) 
2 

しかしsumではありません。

>>> sum(1,2) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
TypeError: 'int' object is not iterable 

operator module documentationを参照すると、addバイナリ演算子:

>>> from operator import add 
>>> add(1,2) 
3 
>>> file_rdd.values().reduce(add) 
50 
関連する問題