RDDでキーをいくつかの行にグループ化したいので、1つのグループ内の行でより高度な操作を実行できます。注目すべきは、私は単にいくつかの集計値を計算したくないということです。行はキーと値のペアです。キーはGUIDで、値は複雑なオブジェクトです。pyspark combineByKey(groupByKeyと対照的な)の結果が一致しません
pysparkのドキュメントによれば、私はまずgroupByKeyよりも性能が良いと思われるので、combineByKeyでこれを実装しようとしました。先頭のリストには、ちょうど説明のためではなく、私の実際のデータ:
l = list(range(1000))
numbers = sc.parallelize(l)
rdd = numbers.map(lambda x: (x % 5, x))
def f_init(initial_value):
return [initial_value]
def f_merge(current_merged, new_value):
if current_merged is None:
current_merged = []
return current_merged.append(new_value)
def f_combine(merged1, merged2):
if merged1 is None:
merged1 = []
if merged2 is None:
merged2 = []
return merged1 + merged2
combined_by_key = rdd.combineByKey(f_init, f_merge, f_combine)
c = combined_by_key.collectAsMap()
i = 0
for k, v in c.items():
if v is None:
print(i, k, 'value is None.')
else:
print(i, k, len(v))
i += 1
これの出力は次のとおりです。私が期待したものではありません
0 0 0
1 1 0
2 2 0
3 3 0
4 4 0
。
grouped_by_key = rdd.groupByKey()
d = grouped_by_key.collectAsMap()
i = 0
for k, v in d.items():
if v is None:
print(i, k, 'value is None.')
else:
print(i, k, len(v))
i += 1
戻り値:
0 0 200
1 1 200
2 2 200
3 3 200
4 4 200
私は何かが欠けていない限り、これはgroupByKeyがreduceByKeyまたはcombineByKeyよりも優先される場合がある(groupByKeyで実装同じロジックが、正しい出力を返します。関連する議論のトピック:Is groupByKey ever preferred over reduceByKey)。
ありがとうございました。問題が解決されたので、私は両方の選択肢のパフォーマンスをテストしました。combineByKeyの方がいくらか高速でした(データセットが大きいほど、より劇的な違いがあります)。私は、combineByKeyとgroupByKeyが異なって動作し、マージ中にgroupByKeyがそのデータのローカリティを使用するように最適化されるという保証はないので、その理由があると思います:http://codingjunkie.net/spark-combine-by-key/ – Eftim
ここには2つの問題があります。 a)Scalaは、map side aggregationを特に無効にします(これは、データの局所性を使用するために_optimizationを使用することを意味します)。GC時間を短縮してパフォーマンスを向上させます。b)Pythonの実装はScalaによって提供されるものと等価ではなく、実際はcombineByKeyによって実装されます。 – zero323