N個のアイテムの間にペアワイズスコアマトリックスを作成することから始まるSparkJobがあります。集中的ですが、これは約20K要素までかなり高速ですが、それ以降は非常に長い間停滞しているようです。私が複数の試みで見た最後のログラインは 'clean accumulator'でした。以下のコードブロックを添付して、ランダムに作成された50K要素のデータセットで問題を再現しました。デカルト製品は非常に高速で、RDDの結果は2〜3分で返されますが、ログやSpark Jobs UIの進捗状況が更新されずに2番目のカウントが2時間以上滞ってしまいます。私は15 EC2 M3.2xLargeノードのクラスタを持っています。ここで何が起こっているのか、これをスピードアップするために何ができるのかを私はどのように理解できますか?RDDを実現している間にPySparkの仕事が停止しているようです。
import random
from pyspark.context import SparkContext
from pyspark.sql import HiveContext, SQLContext
import math
from pyspark.sql.types import *
from pyspark.sql.types import Row
sc=SparkContext(appName='kmedoids_test')
sqlContext=HiveContext(sc)
n=50000
A = [random.normalvariate(0, 1) for i in range(n)]
B = [random.normalvariate(1, 1) for i in range(n)]
C = [random.normalvariate(-1, 0.5) for i in range(n)]
df = sqlContext.createDataFrame(zip(A,B,C), ["A","B","C"])
f = lambda x, y : math.pow((x.A - y.A), 2) + math.pow((x.B - y.B), 2) + math.pow((x.C - y.C), 2)
schema = StructType([StructField("row_id", LongType(), False)] + df.schema.fields[:])
no_of_cols=len(df.columns)
rdd_zipped_with_index=df.rdd.zipWithIndex()
reconstructed_rdd = rdd_zipped_with_index.map(lambda x: [x[1]]+list(x[0][0:no_of_cols]))
indexed_df=reconstructed_rdd.toDF(schema)
indexed_rdd = indexed_df.rdd
sc._conf.set("spark.sql.autoBroadcastJoinThreshold","-1") #turning off broadcast join
rdd_cartesian_prod = indexed_rdd.cartesian(indexed_rdd)
print "----------Count in self-join--------------- {0}".format(rdd_cartesian_prod.count()) #this returns quickly in about 160s
ScoreVec = Row("head_id","tail_id","score")
output_rdd = rdd_cartesian_prod.map(lambda x : ScoreVec(float(x[0].row_id), float(x[1].row_id), float(f(x[0], x[1]))))
print "-----------Count after scoring--------------- {0}".format(output_rdd.count()) #gets stuck here for a LONG time
output_df = output_rdd.toDF() #does not get here
「クリーンアキュムレータ」は、あまり冗長ではないと言っても、Sparkが常に吐き出す行です。 – Jeff