2016-06-29 10 views
0

N個のアイテムの間にペアワイズスコアマトリックスを作成することから始まるSparkJobがあります。集中的ですが、これは約20K要素までかなり高速ですが、それ以降は非常に長い間停滞しているようです。私が複数の試みで見た最後のログラインは 'clean accumulator'でした。以下のコードブロックを添付して、ランダムに作成された50K要素のデータセットで問題を再現しました。デカルト製品は非常に高速で、RDDの結果は2〜3分で返されますが、ログやSpark Jobs UIの進捗状況が更新されずに2番目のカウントが2時間以上滞ってしまいます。私は15 EC2 M3.2xLargeノードのクラスタを持っています。ここで何が起こっているのか、これをスピードアップするために何ができるのかを私はどのように理解できますか?RDDを実現している間にPySparkの仕事が停止しているようです。

import random 
from pyspark.context import SparkContext 
from pyspark.sql import HiveContext, SQLContext 
import math 
from pyspark.sql.types import * 
from pyspark.sql.types import Row 
sc=SparkContext(appName='kmedoids_test') 
sqlContext=HiveContext(sc) 
n=50000 
A = [random.normalvariate(0, 1) for i in range(n)] 
B = [random.normalvariate(1, 1) for i in range(n)] 
C = [random.normalvariate(-1, 0.5) for i in range(n)] 
df = sqlContext.createDataFrame(zip(A,B,C), ["A","B","C"]) 
f = lambda x, y : math.pow((x.A - y.A), 2) + math.pow((x.B - y.B), 2) + math.pow((x.C - y.C), 2) 
schema = StructType([StructField("row_id", LongType(), False)] +  df.schema.fields[:]) 
no_of_cols=len(df.columns) 
rdd_zipped_with_index=df.rdd.zipWithIndex() 
reconstructed_rdd = rdd_zipped_with_index.map(lambda x: [x[1]]+list(x[0][0:no_of_cols])) 
indexed_df=reconstructed_rdd.toDF(schema) 
indexed_rdd = indexed_df.rdd 
sc._conf.set("spark.sql.autoBroadcastJoinThreshold","-1") #turning off broadcast join 
rdd_cartesian_prod = indexed_rdd.cartesian(indexed_rdd) 
print "----------Count in self-join--------------- {0}".format(rdd_cartesian_prod.count()) #this returns quickly in about 160s 
ScoreVec = Row("head_id","tail_id","score") 
output_rdd = rdd_cartesian_prod.map(lambda x : ScoreVec(float(x[0].row_id), float(x[1].row_id), float(f(x[0], x[1])))) 
print "-----------Count after scoring--------------- {0}".format(output_rdd.count()) #gets stuck here for a LONG time 
output_df = output_rdd.toDF() #does not get here 
+0

「クリーンアキュムレータ」は、あまり冗長ではないと言っても、Sparkが常に吐き出す行です。 – Jeff

答えて

0

これが原因にlazy evaluation

スパークが同じである可能性があります。それはあなたがそれが演算子を与えられて終了するまで待って、あなたが最終的な答えを与えるためにそれを求めるときにのみ、それは評価し、常にそれが何をしなければならないかを制限するように見えます。

デカルト積後の行数はindexed_rdd.count()^2になります。スパークは実際にどれくらいの数があるかを知るためにこれらの行をすべて生成する必要はありません。 output_rdd.count()では行数は同じですが、Sparkは実際にすべてのデータを処理しており、カウントする前にマッピングしています。そういうわけで、この仕事はずっと長い時間を要しています。これが起こっていることを証明するために、indexed_rdd.cache().count()を試すことができます。カウントする前にキャッシングすると、データ処理が強制され(結果がメモリに保存されます)、時間がかかります。

+0

Davidに感謝します。あなたが提案したように、私はrdd_cartesian_prod.cache()。count()を試して、自己結合RDDの処理を強制しました。残りの仕事は30kデータセットで45分以上かかります。どうすればこれをスピードアップできますか?並列化の増加は生産性を低下させ、より多くの結果をもたらすでしょうか? – 4a616e

関連する問題