2016-04-20 13 views
0

this多くの列を持つCSVファイルを読み込み、Sparkを使用して列間の相関関係を計算しようとしています。多くの列を持つCSVファイルを読み込むとPythonワーカーがクラッシュする

from pyspark import SparkContext, SparkConf 
from pyspark.mllib.stat import Statistics 

conf = SparkConf()\ 
    .setAppName("Movie recommender")\ 
    .setMaster("local[*]")\ 
    .set("spark.driver.memory", "10g")\ 
    .set("spark.driver.maxResultSize", "4g") 

sc = SparkContext(conf=conf) 


pivot = sc.textFile(r"pivot.csv") 
header = pivot.first() 
pivot = pivot.filter(lambda x:x != header) 
pivot = pivot.map(lambda x:x.split()).cache() 
corrs = Statistics.corr(pivot) 

私はこのエラーを取得する:私は増加パーティションでこれを実行するために管理

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(Unknown Source) 
    at java.net.SocketOutputStream.write(Unknown Source) 

答えて

0

。しかし、実際には動作していないローカルマシンではパフォーマンスが実際には遅いです。列の数が多い場合、パフォーマンスの問題が予想されます。

def extract_sparse(str_lst, N): 
    if len(str_lst) == 0: 
     return (0, {}) 
    else: 
     keyvalue = {} 
     length = len(str_lst) 
     if length > N: 
      length = N 
     for i in range(length): 
      if str_lst[i] != '': # not missing 
       keyvalue[i] = float(str_lst[i]) 

     return (length, keyvalue) 

pivot = sc.textFile(r"pivot.csv", 24) 
header = pivot.first() 
pivot = pivot.filter(lambda x:x != header) 
pivot = pivot.map(lambda x:x.split(',')) 
pivot = pivot.map(lambda x: extract_sparse(x, 50000)) 
pivot = pivot.map(lambda x: Vectors.sparse(x[0], x[1])) 
pivot = pivot.map(lambda x: x.toArray()).collect() 
corrs = Statistics.corr(pivot)