2016-08-17 10 views
0

私はサイズが〜38メガバイト、1017210行と10個のファイルを持っています。私は64ビットのWindows OSと8GBのRAMを備えたスタンドアロンモードでsparkを使用しています。私はそのCSVをpysparkデータフレームに読み込もうとしています。そして、私のようにデータフレームに読み込むしようとしていますpysparkが1000000+個の行でデータフレームを作成できません

trainRaw = sc.textFile("D:/Rossmann/train/train.csv").map(lambda line:line.split(",")) 

trainRaw_df = trainRaw.toDF(["Store","DayOfWeek","Date","Sales","Customers","Open","Promo","StateHoliday","SchoolHoliday"]).first() 

しかし、として、私はエラーを取得しています:私が持っている

16/08/17 10:27:41 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393 
    16/08/17 10:27:41 INFO DAGScheduler: Got job 12 (runJob at PythonRDD.scala:393) with 1 output partitions 
    16/08/17 10:27:41 INFO DAGScheduler: Final stage: ResultStage 12 (runJob at PythonRDD.scala:393) 
    16/08/17 10:27:41 INFO DAGScheduler: Parents of final stage: List() 
    16/08/17 10:27:41 INFO DAGScheduler: Missing parents: List() 
    16/08/17 10:27:41 INFO DAGScheduler: Submitting ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43) 
    16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 5.2 KB 
    16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 3.3 KB 
    16/08/17 10:27:41 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on localhost:49516 (size: 3.3 KB 
    16/08/17 10:27:41 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:1006 
    16/08/17 10:27:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43) 
    16/08/17 10:27:41 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks 
    16/08/17 10:27:41 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 14 
    16/08/17 10:27:41 INFO Executor: Running task 0.0 in stage 12.0 (TID 14) 
    16/08/17 10:27:41 INFO HadoopRDD: Input split: file:/D:/Rossmann/train/train.csv:0+19028976 
    16/08/17 10:27:42 INFO PythonRunner: Times: total = 1328 
    16/08/17 10:27:42 ERROR PythonRunner: Python worker exited unexpectedly (crashed) 
    java.net.SocketException: Connection reset by peer: socket write error 
      at java.net.SocketOutputStream.socketWrite0(Native Method) 
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
      at java.io.DataOutputStream.write(DataOutputStream.java:107) 
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
      at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 
    16/08/17 10:27:42 ERROR PythonRunner: This may have been caused by a prior exception: 
    java.net.SocketException: Connection reset by peer: socket write error 
      at java.net.SocketOutputStream.socketWrite0(Native Method) 
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
      at java.io.DataOutputStream.write(DataOutputStream.java:107) 
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
      at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 
    16/08/17 10:27:42 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 14) 
    java.net.SocketException: Connection reset by peer: socket write error 
      at java.net.SocketOutputStream.socketWrite0(Native Method) 
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
      at java.io.DataOutputStream.write(DataOutputStream.java:107) 
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
      at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 
    16/08/17 10:27:42 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 14 
      at java.net.SocketOutputStream.socketWrite0(Native Method) 
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
      at java.io.DataOutputStream.write(DataOutputStream.java:107) 
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
      at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

    16/08/17 10:27:42 ERROR TaskSetManager: Task 0 in stage 12.0 failed 1 times; aborting job 
    16/08/17 10:27:42 INFO TaskSchedulerImpl: Removed TaskSet 12.0 
    16/08/17 10:27:42 INFO TaskSchedulerImpl: Cancelling stage 12 
    16/08/17 10:27:42 INFO DAGScheduler: ResultStage 12 (runJob at PythonRDD.scala:393) failed in 1.454 s 
    16/08/17 10:27:42 INFO DAGScheduler: Job 12 failed: runJob at PythonRDD.scala:393 
    Traceback (most recent call last): 
     File "<stdin>" 
     File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py" 
     rs = self.take(1) 
     File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py" 
     res = self.context.runJob(self 
     File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\context.py" 
     port = self._jvm.PythonRDD.runJob(self._jsc.sc() 
     File "D:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py" 
     File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\sql\utils.py" 
     return f(*a 
     File "D:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py" 
    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times 
    ): java.net.SocketException: Connection reset by peer: socket write error 
      at java.net.SocketOutputStream.socketWrite0(Native Method) 
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
      at java.io.DataOutputStream.write(DataOutputStream.java:107) 
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
      at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

    Driver stacktrace: 
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
      at scala.Option.foreach(Option.scala:236) 
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
      at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393) 
      at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:498) 
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
      at py4j.Gateway.invoke(Gateway.java:259) 
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
      at py4j.commands.CallCommand.execute(CallCommand.java:79) 
      at py4j.GatewayConnection.run(GatewayConnection.java:209) 
      at java.lang.Thread.run(Thread.java:745) 
    Caused by: java.net.SocketException: Connection reset by peer: socket write error 
      at java.net.SocketOutputStream.socketWrite0(Native Method) 
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
      at java.io.DataOutputStream.write(DataOutputStream.java:107) 
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
      at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
      at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

    >>> 16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_17_piece0 on localhost:49516 in memory (size: 3.3 KB 
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_19_piece0 on localhost:49516 in memory (size: 3.3 KB 
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 14 
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_18_piece0 on localhost:49516 in memory (size: 6.1 KB 
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 13 
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 12 
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_16_piece0 on localhost:49516 in memory (size: 3.7 KB 
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 11 
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_14_piece0 on localhost:49516 in memory (size: 3.7 KB 
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 10 

まず私は、データをロードしています作業者のメモリを増やし、JAVA_OPTSを次のように変更しました。

export SPARK_MASTER_IP=127.0.0.1 
    export SPARK_LOCAL_IP=127.0.0.1 
    export SPARK_WORKER_MEMORY=6g" 
    export SPARK_MEM=6g" 
    export SPARK_DAEMON_MEMORY=6g" 
    export SPARK_JAVA_OPTS=""-Dspark.executor.memory=6g -Dspark.storage.memoryFraction=0.66 -Dspark.serializer=org.apache.spark.serializer.JavaSerializer -Dspark.executor.memory=6g -Dspark.locality.wait=60000000" 
    export JAVA_OPTS=""-Xms6G -Xmx6G""" 

しかし何もそれを助けませんでした。このタイプのメモリ問題をどう対処することができますかをお勧めします。

+0

最初のエラーを投稿していないようですが、ログに以前のものがあるはずです。 – marmouset

+0

'trainRaw.first()'は機能しますか? – ShuaiYuan

答えて

0

trainRaw.show()も失敗しますか?そうでない場合は、適切なスキーマを渡すことをお勧めします。

from pyspark.sql.types import * 
schema = StructType([StructField('art_Store', StringType(), True), 
... 
print trainRaw.toDF(schema).first() 
+0

'RDD.show()' – ShuaiYuan

+0

はありません.first()を試してください – marmouset

関連する問題