2017-10-11 27 views
0

私はスパーク並列化を含むプログラムを複数回実行しています。このプログラムは最初の数回の繰り返しで正常に動作しますが、メモリの問題でクラッシュします。私はPython 2.7でSpark 2.2.0を使用していますが、30gのメモリを搭載したAWS EC2でテストを実行しています。以下はPysparkメモリの問題

マイスパーク設定です:

conf = pyspark.SparkConf() 
conf.set("spark.executor.memory", '4g') 
conf.set('spark.executor.cores', '16') 
conf.set('spark.cores.max', '16') 
conf.set("spark.driver.memory",'4g') 
conf.setMaster("local[*]") 

、ここでは私のエラーログです:

Traceback (most recent call last): 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1982, 
    in wsgi_app 
    response = self.full_dispatch_request() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1614, 
    in full_dispatch_request 
    rv = self.handle_user_exception(e) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1517, 
    in handle_user_exception 
    reraise(exc_type, exc_value, tb) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1612, 
    in full_dispatch_request 
    rv = self.dispatch_request() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1598, 
    in dispatch_request 
    return self.view_functions[rule.endpoint](**req.view_args) 
    File 
    "C:/Users/Administrator/Desktop/Flex_Api_Post/ 
    flex_api_post_func_spark_setup.py", line 152, in travel_time_est 
    count = ssc.parallelize(input_json).map(lambda j: 
    flex_func(j)).collect() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 
    809, 
    in collect 
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\java_gateway.py", 
    line 
    1160, in __call__ 
    answer, self.gateway_client, self.target_id, self.name) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\protocol.py", line 
    320, in get_return_value 
    format(target_id, ".", name), value) 
    Py4JJavaError: An error occurred while calling 
    z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
    : org.apache.spark.SparkException: Job aborted due to 
    stage failure: Task 7 
    in stage 13.0 failed 1 times, most recent failure: 
    Lost task 7.0 in stage 
    13.0 (TID 215, localhost, executor driver): 
    org.apache.spark.api.python.PythonException: 
    Traceback (most recent call 
    last): 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 166, in main 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 57, in 
    read_command 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", 
    line 454, in loads 
    return pickle.loads(obj) 
    MemoryError 

    at 
    org.apache.spark.api.python.PythonRunner$$anon$1. 
    read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init> 
    (PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

    Driver stacktrace: 
    at 

    org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler 
    $DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) 
    at 
    org.apache.spark.scheduler.DAGScheduler 
    $$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) 
    at org.apache.spark.scheduler. 
    DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) 
    at scala.collection. 
    mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler. 
    DAGScheduler.abortStage(DAGScheduler.scala:1486) 
    at org.apache.spark.scheduler. 
    DAGScheduler$$anonfun$handleTaskSetFailed$1.apply 
    (DAGScheduler.scala:814) 
    at org.apache.spark.scheduler. 
    DAGScheduler$$anonfun$handleTaskSetFailed$1.apply 
    (DAGScheduler.scala:814) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler. 
    DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) 
    at org.apache.spark.scheduler. 
    DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) 
    at org.apache.spark.scheduler. 
    DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) 
    at org.apache.spark.scheduler. 
    DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler. 
    runJob(DAGScheduler.scala:630) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) 
    at org.apache.spark.rdd.RDDOperationScope$. 
    withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$. 
    withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935) 
    at org.apache.spark.api.python.PythonRDD$. 
    collectAndServe(PythonRDD.scala:458) 
    at org.apache.spark.api.python.PythonRDD. 
    collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Unknown Source) 
    Caused by: org.apache.spark.api.python.PythonException: 
    Traceback (most recent call last): 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", 
    line 166, in main 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", 
    line 57, in read_command 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", 
    line 454, in loads 
    return pickle.loads(obj) 
    MemoryError 

    at org.apache.spark.api.python. 
    PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init> 
    (PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    1 more 

答えて

1

だがPySparkがどのように機能するかを少し説明しましょう。

作業員ごとに16コアのpysparkを使用すると、各JVMワーカー用に16個のPythonインスタンスを並列起動するようにSparkに要求しています。

enter image description here

だから、私はあなたの設定についてはこちらを確認することができますによると、あなたは4GBのそれぞれに労働者を要求している、それぞれが16個のコアで実行されます:あなたは下の画像で見ることができます。これにより、16個のパイプを作成する1つのJVMと、並行して実行する16個のPythonインスタンスを持つ構造が作成されます。あなたが直面しているこのエラーは、Pythonを実行するのに十分なメモリがないということです。

多分、作業者1人あたりのコア数を減らす必要があり、プロセスを処理することができますし、メモリを追加することもできます。

詳細については、hereをご確認ください。