私はサイズが〜38メガバイト、1017210行と10個のファイルを持っています。私は64ビットのWindows OSと8GBのRAMを備えたスタンドアロンモードでsparkを使用しています。私はそのCSVをpysparkデータフレームに読み込もうとしています。そして、私のようにデータフレームに読み込むしようとしていますpysparkが1000000+個の行でデータフレームを作成できません
trainRaw = sc.textFile("D:/Rossmann/train/train.csv").map(lambda line:line.split(","))
:
trainRaw_df = trainRaw.toDF(["Store","DayOfWeek","Date","Sales","Customers","Open","Promo","StateHoliday","SchoolHoliday"]).first()
しかし、として、私はエラーを取得しています:私が持っている
16/08/17 10:27:41 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
16/08/17 10:27:41 INFO DAGScheduler: Got job 12 (runJob at PythonRDD.scala:393) with 1 output partitions
16/08/17 10:27:41 INFO DAGScheduler: Final stage: ResultStage 12 (runJob at PythonRDD.scala:393)
16/08/17 10:27:41 INFO DAGScheduler: Parents of final stage: List()
16/08/17 10:27:41 INFO DAGScheduler: Missing parents: List()
16/08/17 10:27:41 INFO DAGScheduler: Submitting ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43)
16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 5.2 KB
16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 3.3 KB
16/08/17 10:27:41 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on localhost:49516 (size: 3.3 KB
16/08/17 10:27:41 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:1006
16/08/17 10:27:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43)
16/08/17 10:27:41 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks
16/08/17 10:27:41 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 14
16/08/17 10:27:41 INFO Executor: Running task 0.0 in stage 12.0 (TID 14)
16/08/17 10:27:41 INFO HadoopRDD: Input split: file:/D:/Rossmann/train/train.csv:0+19028976
16/08/17 10:27:42 INFO PythonRunner: Times: total = 1328
16/08/17 10:27:42 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 ERROR PythonRunner: This may have been caused by a prior exception:
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 14)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 14
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 ERROR TaskSetManager: Task 0 in stage 12.0 failed 1 times; aborting job
16/08/17 10:27:42 INFO TaskSchedulerImpl: Removed TaskSet 12.0
16/08/17 10:27:42 INFO TaskSchedulerImpl: Cancelling stage 12
16/08/17 10:27:42 INFO DAGScheduler: ResultStage 12 (runJob at PythonRDD.scala:393) failed in 1.454 s
16/08/17 10:27:42 INFO DAGScheduler: Job 12 failed: runJob at PythonRDD.scala:393
Traceback (most recent call last):
File "<stdin>"
File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py"
rs = self.take(1)
File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\rdd.py"
res = self.context.runJob(self
File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\context.py"
port = self._jvm.PythonRDD.runJob(self._jsc.sc()
File "D:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py"
File "D:\spark-1.6.1-bin-hadoop2.6\python\pyspark\sql\utils.py"
return f(*a
File "D:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py"
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times
): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
>>> 16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_17_piece0 on localhost:49516 in memory (size: 3.3 KB
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_19_piece0 on localhost:49516 in memory (size: 3.3 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 14
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_18_piece0 on localhost:49516 in memory (size: 6.1 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 13
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 12
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_16_piece0 on localhost:49516 in memory (size: 3.7 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 11
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_14_piece0 on localhost:49516 in memory (size: 3.7 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 10
まず私は、データをロードしています作業者のメモリを増やし、JAVA_OPTSを次のように変更しました。
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_WORKER_MEMORY=6g"
export SPARK_MEM=6g"
export SPARK_DAEMON_MEMORY=6g"
export SPARK_JAVA_OPTS=""-Dspark.executor.memory=6g -Dspark.storage.memoryFraction=0.66 -Dspark.serializer=org.apache.spark.serializer.JavaSerializer -Dspark.executor.memory=6g -Dspark.locality.wait=60000000"
export JAVA_OPTS=""-Xms6G -Xmx6G"""
しかし何もそれを助けませんでした。このタイプのメモリ問題をどう対処することができますかをお勧めします。
最初のエラーを投稿していないようですが、ログに以前のものがあるはずです。 – marmouset
'trainRaw.first()'は機能しますか? – ShuaiYuan