2016-09-21 15 views
11

私はPySparkトレーニングコースを卒業しましたが、コードラインの例のスクリプトをコンパイルしています(これはコードブロックが何もしない理由を説明しています)。このコードを実行するたびに、このエラーが1回か2回発生します。それをスローする行は実行の間に変わります。私はspark.executor.memoryspark.executor.heartbeatIntervalを設定しようとしましたが、エラーは解決しません。私はまた、変更なしで様々な行の最後に.cache()を入れてみました。PySparkが "Socket is closed"というランダムなエラーで失敗するのはなぜですか?

エラー:

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python 
java.net.SocketException: Socket is closed 
     at java.net.Socket.shutdownOutput(Socket.java:1551) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857) 
     at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

コード:

from pyspark import SparkConf, SparkContext 

def parseLine(line): 
    fields = line.split(',') 
    return (int(fields[0]), float(fields[2])) 

def parseGraphs(line): 
    fields = line.split() 
    return (fields[0]), [int(n) for n in fields[1:]] 

# putting the [*] after local makes it run one executor on each core of your local PC 
conf = SparkConf().setMaster("local[*]").setAppName("MyProcessName") 

sc = SparkContext(conf = conf) 

# parse the raw data and map it to an rdd. 
# each item in this rdd is a tuple 
# two methods to get the exact same data: 
########## All of these methods can use lambda or full methods in the same way ########## 
# read in a text file 
customerOrdersLines = sc.textFile("file:///SparkCourse/customer-orders.csv") 
customerOrdersRdd = customerOrdersLines.map(parseLine) 
customerOrdersRdd = customerOrdersLines.map(lambda l: (int(l.split(',')[0]), float(l.split(',')[2]))) 
print customerOrdersRdd.take(1) 

# countByValue groups identical values and counts them 
salesByCustomer = customerOrdersRdd.map(lambda sale: sale[0]).countByValue() 
print salesByCustomer.items()[0] 

# use flatMap to cut everything up by whitespace 
bookText = sc.textFile("file:///SparkCourse/Book.txt") 
bookRdd = bookText.flatMap(lambda l: l.split()) 
print bookRdd.take(1) 

# create key/value pairs that will allow for more complex uses 
names = sc.textFile("file:///SparkCourse/marvel-names.txt") 
namesRdd = names.map(lambda line: (int(line.split('\"')[0]), line.split('\"')[1].encode("utf8"))) 
print namesRdd.take(1) 

graphs = sc.textFile("file:///SparkCourse/marvel-graph.txt") 
graphsRdd = graphs.map(parseGraphs) 
print graphsRdd.take(1) 

# this will append "extra text" to each name. 
# this is faster than a normal map because it doesn't give you access to the keys 
extendedNamesRdd = namesRdd.mapValues(lambda heroName: heroName + "extra text") 
print extendedNamesRdd.take(1) 

# not the best example because the costars is already a list of integers 
# but this should return a list, which will update the values 
flattenedCostarsRdd = graphsRdd.flatMapValues(lambda costars: costars) 
print flattenedCostarsRdd.take(1) 

# put the heroes in ascending index order 
sortedHeroes = namesRdd.sortByKey() 
print sortedHeroes.take(1) 

# to sort heroes by alphabetical order, we switch key/value to value/key, then sort 
alphabeticalHeroes = namesRdd.map(lambda (key, value): (value, key)).sortByKey() 
print alphabeticalHeroes.take(1) 

# make sure that "spider" is in the name of the hero 
spiderNames = namesRdd.filter(lambda (id, name): "spider" in name.lower()) 
print spiderNames.take(1) 

# reduce by key keeps the key and performs aggregation methods on the values. in this example, taking the sum 
combinedGraphsRdd = flattenedCostarsRdd.reduceByKey(lambda value1, value2: value1 + value2) 
print combinedGraphsRdd.take(1) 

# broadcast: this is accessible from any executor 
sentData = sc.broadcast(["this can be accessed by all executors", "access it using sentData"]) 

# accumulator: this is synced across all executors 
hitCounter = sc.accumulator(0) 
+0

どのステップでエラーが返されるか教えてください。皆さんは作品を印刷していますか? –

+0

おそらく、送信元ポートと宛先ポートが混乱している可能性があります。デフォルトの接続パターン 'Any(available)>> Target Port'おそらくデフォルトのポートが80で、80ポートに接続できません。 Wiresharkとクライアントとサーバーの接続を確認することを強くお勧めします。 – dsgdfg

+0

Sparkのバージョンは何ですか?あなたは 'pyspark'を起動し、エラーなしでいくつかのコマンドをタイプできますか?それはWindowsだよね?あなたはどのように上記のコードを実行しますか? –

答えて

0

免責事項:私はスパークのコードベースの部分に十分な時間を費やしたが、私はあなたに一部を与えてみましょうしていません解決策につながるかもしれないヒント。以下では、問題の解決策ではなく、より多くの情報をどこで検索するかを説明するだけです。


コードhereに見られるように、あなたが直面している例外は、(あなたがworker.shutdownOutput()が実行されたときであるラインjava.net.Socket.shutdownOutput(Socket.java:1551)で見ることができるよう)他のいくつかの問題が原因です。

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python 
java.net.SocketException: Socket is closed 
     at java.net.Socket.shutdownOutput(Socket.java:1551) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857) 
     at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

これは、ERRORが他の以前のエラーのフォローアップであると私に信じさせます。

パイソンの名前STDOUTライターが(EvalPythonExec物理演算子を使用して)というthe name of the threadでは、Sparkとpyspark(あなたがあまり変化せずにPythonコードを実行することができます)との間の通信のための責任があります。

実際にはthe scaladoc of EvalPythonExecは、内部的にpysparkが使用し、外部のPythonプロセスにソケットを使用する基盤となる通信インフラストラクチャについてかなりの情報を提供します。 (あなたがpysparkシェルスクリプトherehereで見ることができるように)PYSPARK_DRIVER_PYTHONまたはPYSPARK_PYTHONを使用して上書きしない限り、

Python evaluation works by sending the necessary (projected) input data via a socket to an external Python process, and combine the result from the Python process with the original row.

また、pythonがデフォルトで使用されています。これは、失敗したスレッドの名前に表示される名前です。

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python

次のコマンドを使用して、ご使用のシステムでPythonのバージョンをチェックすることをおすすめします。

python -c 'import sys; print(sys.version_info)' 

That should be Python 2.7+ていますが、うまくスパークでテストされていない非常に最新のPythonを使っているかもしれません。 推測...


あなたはpysparkアプリケーションの実行のログ全体を含めるべきであると私は答えを見つけることを期待したいところです。

関連する問題