2017-02-15 10 views
0

有効なチェックポイントでpysparkストリーミングを使用します。 最初の打ち上げは成功したが時にエラーで再起動がクラッシュ:それは私のミスで申し訳ありませんが、スパークコンテキストaddPyFileによって追加pysparkストリーミングチェックポイントから復元

INFO scheduler.DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:441) failed in 1,160 s due to Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 86, h-1.e-contenta.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File"/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/worker.py", line 163, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File"/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/worker.py", line 56, in read_command command = serializer.loads(command.value) File"/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/serializers.py", line 431, in loads return pickle.loads(obj, encoding=encoding) ImportError: No module named ...

Pythonモジュール()

def create_streaming(): 
""" 
Create streaming context and processing functions 
:return: StreamingContext 
""" 
sc = SparkContext(conf=spark_config) 
zip_path = zip_lib(PACKAGES, PY_FILES) 
sc.addPyFile(zip_path) 
ssc = StreamingContext(sc, BATCH_DURATION) 

stream = KafkaUtils.createStream(ssc=ssc, zkQuorum=','.join(ZOOKEEPER_QUORUM), 
             groupId='new_group', 
             topics={topic: 1}) 

stream.checkpoint(BATCH_DURATION) 
stream = stream \ 
    .map(lambda x: process(ujson.loads(x[1]), geo_data_bc_value)) \ 
    .foreachRDD(lambda_log_writer(topic, schema_bc_value)) 

ssc.checkpoint(STREAM_CHECKPOINT) 
return ssc 

if __name__ == '__main__': 
ssc = StreamingContext.getOrCreate(STREAM_CHECKPOINT, lambda: create_streaming()) 
ssc.start() 
ssc.awaitTermination() 
+0

はこれを試してみてください? ssc.getOrCreateまたはssc.getOrCreateの後? –

+0

ストリーミングコンテキストを返すメソッドで: – koddyf

+0

ssc = StreamingContext.getOrCreateの後に追加のssc.addPyFileを設定します。 –

答えて

0

。あなたがssc.addPyFileを設定するのです

if __name__ == '__main__': 
    ssc = StreamingContext.getOrCreate('', None) 
    ssc.sparkContext.addPyFile() 

    ssc.start() 
    ssc.awaitTermination() 
+0

ありがとう、これは本当に助けになった! スパークコンテキストの作成時にこれがなぜ機能していないのかは、私にとって謎のままですが... – koddyf

関連する問題