2016-06-19 12 views
-1

RDDでキーをいくつかの行にグループ化したいので、1つのグループ内の行でより高度な操作を実行できます。注目すべきは、私は単にいくつかの集計値を計算したくないということです。行はキーと値のペアです。キーはGUIDで、値は複雑なオブジェクトです。pyspark combineByKey(groupByKeyと対照的な)の結果が一致しません

pysparkのドキュメントによれば、私はまずgroupByKeyよりも性能が良いと思われるので、combineByKeyでこれを実装しようとしました。先頭のリストには、ちょうど説明のためではなく、私の実際のデータ:

l = list(range(1000)) 
numbers = sc.parallelize(l) 
rdd = numbers.map(lambda x: (x % 5, x)) 

def f_init(initial_value): 
    return [initial_value] 

def f_merge(current_merged, new_value): 
    if current_merged is None: 
     current_merged = [] 
    return current_merged.append(new_value) 

def f_combine(merged1, merged2): 
    if merged1 is None: 
     merged1 = [] 
    if merged2 is None: 
     merged2 = [] 
    return merged1 + merged2 

combined_by_key = rdd.combineByKey(f_init, f_merge, f_combine) 

c = combined_by_key.collectAsMap() 
i = 0 
for k, v in c.items(): 
    if v is None: 
     print(i, k, 'value is None.') 
    else: 
     print(i, k, len(v)) 
    i += 1 

これの出力は次のとおりです。私が期待したものではありません

0 0 0 
1 1 0 
2 2 0 
3 3 0 
4 4 0 

grouped_by_key = rdd.groupByKey() 
d = grouped_by_key.collectAsMap() 
i = 0 
for k, v in d.items(): 
    if v is None: 
     print(i, k, 'value is None.') 
    else: 
     print(i, k, len(v)) 
    i += 1 

戻り値:

0 0 200 
1 1 200 
2 2 200 
3 3 200 
4 4 200 

私は何かが欠けていない限り、これはgroupByKeyがreduceByKeyまたはcombineByKeyよりも優先される場合がある(groupByKeyで実装同じロジックが、正しい出力を返します。関連する議論のトピック:Is groupByKey ever preferred over reduceByKey)。

答えて

0

基本APIを理解することが望ましい場合があります。あなたはlist.appendドキュメンテーション文字列チェックすると特に:

?list.append 
## Docstring: L.append(object) -> None -- append object to end 
## Type:  method_descriptor 

をあなたはそれが慣例により変更されたオブジェクトを返さないのPython API内の他の変異方法のようにそれが表示されます。 f_mergeは常にNoneを返し、何も蓄積されていないことを意味します。

ほとんどの問題では、groupByKeyよりはるかに効率的な解決策があると言われていますが、combineByKey(またはaggregateByKey)で書き換えることは決してこれらの1つではありません。

+0

ありがとうございました。問題が解決されたので、私は両方の選択肢のパフォーマンスをテストしました。combineByKeyの方がいくらか高速でした(データセットが大きいほど、より劇的な違いがあります)。私は、combineByKeyとgroupByKeyが異なって動作し、マージ中にgroupByKeyがそのデータのローカリティを使用するように最適化されるという保証はないので、その理由があると思います:http://codingjunkie.net/spark-combine-by-key/ – Eftim

+0

ここには2つの問題があります。 a)Scalaは、map side aggregationを特に無効にします(これは、データの局所性を使用するために_optimizationを使用することを意味します)。GC時間を短縮してパフォーマンスを向上させます。b)Pythonの実装はScalaによって提供されるものと等価ではなく、実際はcombineByKeyによって実装されます。 – zero323

関連する問題