2016-09-27 2 views
0

ではありません、私は次のコードを持っています。各my_idのために、私はtimestampフィールドに基づいてamountフィールドを並べ替えるしようとしています:pyspark:reduceByKeyエラーでソート:<lambda> TypeError例外で:「int型のオブジェクト呼び出し可能

output_rdd = my_df.rdd.map(lambda r: (r['my_id'], [r['timestamp'],[r['amount']]]))\ 
         .reduceByKey(lambda a, b: sorted(a+b, key=(a+b)[0]))\ 
         .map(lambda r: r[1]) 

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 52, ph-hdp-prd-dn02): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/data/0/yarn/nm/usercache/analytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 317, in func 
    File "/usr/local/spark-latest/python/pyspark/rdd.py", line 1792, in combineLocally 
    File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues 
    d[k] = comb(d[k], v) if k in d else creator(v) 
    File "<ipython-input-11-ec09929e01e4>", line 6, in <lambda> 
TypeError: 'int' object is not callable 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

任意のアイデアは、私が逃した何:

はしかし、私は次のエラーを得ましたか。どうもありがとうございました!

答えて

1

を試してみてください。キー関数は入力レコードごとに1回だけ呼び出されるため、この手法は高速です。

keyに渡された引数をPython関数またはラムダ関数に変換して再試行します。

1

keyは関数である必要があります。キーパラメータの値が単一の引数を取り、ソートするために使用するキーを返す関数であるべき

- Pythonのdocumentationから注意を以下

...  .reduceByKey(lambda a, b: sorted(a+b, key=lambda x: x[0])) \ 
関連する問題