2016-08-06 15 views
-1

私は酸洗エラーを見ています:PySpark PicklingError

以下

Could not pickle object as excessively deep recursion required.

はバックトレースです:ここでは

Traceback (most recent call last): 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call 
    r = self.func(t, *rdds) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in 
    func = lambda t, rdd: old_func(rdd) 
    if rdd.count() > 0: 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in count 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in sum 
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in fold 
    vals = self.mapPartitions(func).collect() 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 773, in collect 
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2388, in _jrdd 
    pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2308, in _prepare_for_python_RDD 
    pickled_command = ser.dumps(command) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps 
    return cloudpickle.dumps(obj, 2) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps 
    cp.dump(obj) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 111, in dump 
    raise pickle.PicklingError(msg) 
    PicklingError: Could not pickle object as excessively deep recursion required. 

は、エラーの原因となった私のコードの旅館ハイレベルの一部です:

sc = SparkContext(appName="my_app") 

ssc = StreamingContext(sc, 1) 

kafka_stream = KafkaUtils.createDirectStream(ssc, full_topic_list, kafka_params, fromOffsets=offset_dict) 

messages = kafka_stream.map(lambda (k, v): json.loads(v)) 

messages.foreachRDD(lambda rdd: process(rdd, topic_list, sqlcontext)) 

私のプロセス関数には、エラーをスローするrddカウント:if topic_rdd.count() > 0があります。

答えて

0

分散機能(マップ、縮小など)でRDDを処理してRDDを処理することはできません。

+0

ありがとうcftarnas。 RDDに合格すれば、rddの機能を実行する最善の方法は何でしょうか。 – ling

+0

@ling:あなたは本当にRDDを渡すことはできません、彼らはpickleableではありません。あなたの全体的な問題を解決する方法は、あなたがしようとしていることによりますが、より詳細なコードスニペットが役に立ちます。 topic_rdd.count()を事前に計算してから、カウント自体を渡すだけで簡単にできます。 – cftarnas