2016-06-23 9 views
0

昨日から私はpySparkコードで奇妙な動作に直面しています。私はPyCharmとSpark 1.5でWindows上で作業しています。PyCharmを使用したPySpark:java.net.SocketException

私はipythonノートブック(pythonの同じバージョンではなくクラスタ上)で次のコードを正常に実行します。

People=["1,Maj,123","2,Pvt,333","3,Col,999"] 
rrd1=sc.parallelize(People) 
rrd1.first() 

私が手に:私はデバッグモードでコードを実行するなどの基本的な命令を追加した場合、不思議

from pyspark.sql import SQLContext 
from pyspark import SparkContext 
from pyspark import SparkConf, SparkContext 

from pyspark.sql import SQLContext 

# SQL/Spark context: 
conf = (SparkConf().setMaster("local").setAppName("analysis"))#.set("spark.executor.memory", "2g") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

# Input CSV files : 
inputCsvFile = "survey.csv" 
separator = ',' 

# read the input file into a RDD 
rdd = sc.textFile(inputCsvFile).split(separator) 
header = rdd.first().split(separator) 

# build the Schema: (some basic functions to chreate StructType object with string as default type) 
schema = dictSchemaFromColumnsList(header) 
schemaDf = dictSchemaToDFSchema(schema) 

# create Dataframe: 
df = sqlContext.createDataFrame(rdd, schemaDf) 
pprint(rdd.first()) 
print('\ndf.count()=' + str(df.count())) 

# display 
df.show() 

16/06/23 11:46:32 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)

java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208) 16/06/23 11:46:32 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

16/06/23 11:46:32 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 16/06/23 11:46:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/06/23 11:46:32 INFO TaskSchedulerImpl: Cancelling stage 1 16/06/23 11:46:32 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:361) failed in 0.792 s 16/06/23 11:46:32 INFO DAGScheduler: Job 1 failed: runJob at PythonRDD.scala:361, took 0.802922 s Traceback (most recent call last): File "C:/Users/home/PycharmProjects/pySpark_analysis/Survey_2011-2016_Analysis.py", line 38, in df = sqlContext.createDataFrame(rdd, schemaDf) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", line 404, in createDataFrame rdd, schema = self._createFromRDD(data, schema, samplingRatio) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", line 296, in _createFromRDD rows = rdd.take(10) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\context.py", line 916, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in call File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py", line 36, in deco return f(*a, **kw) File "C:\Spark\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:361) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:410) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:420) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:420) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

16/06/23 11:46:32 INFO SparkContext: Invoking stop() from shutdown hook

:しかしPycharmと私のWindows環境でそれを起動している間、私はこれを得ましたときどき動作するコードは....実行が一貫しない.... どんな提案も高く評価されます...

更新日: loo王は問題に戻って、それはマテイが以下に述べるのとまったく同じ挙動に見える。どうやら、入力csvファイルを短絡している間に問題が解決されるようです。

答えて

1

私は(のは言わせて)大きなファイル(20000行)を使用し、正規表現を使用してそれらをフィルタ処理しようとしたときに同じ問題を抱えていた:

import re 
pattern = re.compile("...") 
rdd.filter(lambda x: pattern.search(x) is not None) 

また、あなたが説明するように動作が断続的でした。 ファイルを〜2000行に切り捨てた後、エラーなしで動作しました。

関連する問題