2016-08-08 4 views
1

私は距離を得るために座標を比較する必要があります。そのため、私はsc.textFile()でデータをロードし、デカルト積を作成します。テキストファイルには約2.000.000行あり、したがって座標は2.000.000 x 2.000.000です。スパークデカルト積

約2.000の座標でコードをテストしたところ、数秒で正常に動作しました。しかし、大きなファイルを使用すると、特定の時点で停止しているように見えますが、理由はわかりません。コードは次のようになります。

def concat(x,y): 
    if(isinstance(y, list)&(isinstance(x,list))): 
     return x + y 
    if(isinstance(x,list)&isinstance(y,tuple)): 
     return x + [y] 
    if(isinstance(x,tuple)&isinstance(y,list)): 
     return [x] + y 
    else: return [x,y] 

def haversian_dist(tuple): 
    lat1 = float(tuple[0][0]) 
    lat2 = float(tuple[1][0]) 
    lon1 = float(tuple[0][2]) 
    lon2 = float(tuple[1][2]) 
    p = 0.017453292519943295 
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p))/2 
    print(tuple[0][1]) 
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a)))) 

def sort_val(tuple): 
    dtype = [("globalid", int),("distance",float)] 
    a = np.array(tuple[1], dtype=dtype) 
    sorted_mins = np.sort(a, order="distance",kind="mergesort") 
    return (tuple[0], sorted_mins) 


def calc_matrix(sc, path, rangeval, savepath, name): 
    data = sc.textFile(path) 
    data = data.map(lambda x: x.split(";")) 
    data = data.repartition(100).cache() 
    data.collect() 
    matrix = data.cartesian(data) 
    values = matrix.map(haversian_dist) 
    values = values.reduceByKey(concat) 
    values = values.map(sort_val) 
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist())) 
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]])) 
    dicti = values.collectAsMap() 
    hp.save_pickle(dicti, savepath, name) 

約15.000のエントリのファイルさえも動作しません。私はデカルトの原因O(n^2)ランタイムを知っています。しかし、これを起こしてはいけませんか?または何か間違っている?唯一の出発点は、エラーメッセージですが、それは実際の問題に関連している場合、私は知らない。

16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe) 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) 
    at java.io.DataOutputStream.write(DataOutputStream.java:107) 
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes) 
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(SocketInputStream.java:209) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readInt(DataInputStream.java:387) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

あなたはあなたのデータの例を与えることができますか?また、 'dist(u、v)== dist(v、u)'や 'dist(u、u)== 0'や何らかの定数を使うと、計算数を'(n *(n -1)))/ 2 'の対である。 – jtitusj

+0

1行は "94.5406036377; 1313316.000000000000000; 32.791301727300002; 5"のように見えますが、これを減らすために使用できますが、計算前でも停止していると思います。デカルトが構築されている間にこれを実装できますか? –

+0

あなたはあなたが使用したhaversine距離式に私を指摘し、定数「p」と12742を私に説明できますか?距離の計算に問題があるようです。 – jtitusj

答えて

3

あなたは、基本的には、1台のマシンにすべてのデータを呼び出して、あなたのコード内でdata.collect()を使用しました。そのマシンのメモリによっては、2,000,000行のデータがうまく収まらない場合があります。

また、cartesianの代わりにジョインを行うことで、計算の回数を減らそうとしました。 (私はnumpyのを使用して乱数を生成し、ここでのフォーマットは、あなたが持っているものと異なる場合があることに注意してください。それでも、主な考え方は同じである。)

import numpy as np 
from numpy import arcsin, cos, sqrt 

# suppose my data consists of latlong pairs 
# we will use the indices for pairing up values 
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex() 
data = data.map(lambda (val, idx): (idx, val)) 

# generate pairs (e.g. if i have 3 pairs with indices [0,1,2], 
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2) 
idxs = range(data.count()) 
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j]) 

# haversian func (i took the liberty of editing some parts of it) 
def haversian_dist(latlong1, latlong2): 
    lat1, lon1 = latlong1 
    lat2, lon2 = latlong2 
    p = 0.017453292519943295 
    def hav(theta): return (1 - cos(p * theta))/2 
    a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1) 
    return 12742 * arcsin(sqrt(a)) 

joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val))) 
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2)) 
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y)) 
+0

これはで動作するようです私は今、エラー "slurmstepd:エラー:いくつかのポイントでステップのメモリ制限を超過しました。"お返事をありがとうございます!私はこの新しいエラーを解決できることを願っています^^ –