2016-04-26 14 views
1

非常に長い(数十億行)とやや広い(数百列)RDDを持っています。私は各列に一意の値のセットを作成したい(これらのセットは列ごとに500個を超えるユニークな値を含まないため、これらのセットを並列化する必要はありません)。私はここでやっていることは、空のセットのリスト、私のRDD内の各列の1をinitateしようとしているPySparkのRDDの各列に異なる値を見つける

data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]]) 
num_columns = len(data.first()) 
empty_sets = [set() for index in xrange(num_columns)] 
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y))) 

:ここ

は、私がこれまで持っているものです。集計の最初の部分については、行ごとに dataを繰り返し、列の nの値を n番目のセットのセットに追加します。値がすでに存在する場合は、何もしません。その後、後でセットの unionを実行するので、すべてのパーティションにわたって個別の値のみが返されます。

私はこのコードを実行しようとすると、私は次のエラーを取得する:

AttributeError: 'list' object has no attribute 'add'

私は問題は、私は正確にそれを明確に私が(セットのリストを反復処理していますことを行うわけではないということであると信じてempty_sets)、各行の列をdataに繰り返しています。私は(lambda a, b: a.add(b))aempty_setsであり、bdata.first()(単一の値ではない)であると信じています。これは明らかに機能せず、意図した集約ではありません。

私のリストとデータフレームの各行を通じて、各値を対応するセットオブジェクトに追加するにはどうすればよいですか?所望の出力は次のようになり

[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]


私は私のユースケースに非常に類似している、この例here見てきたPS(私が使用するアイデアを得たところ、それはです最初はaggregate)。しかし、コードをPySparkに変換するのは非常に難しいですが、私はcasezipのコードが何をしているのかはっきりしていません。

答えて

1

2つの問題があります。 1つは、コンバイナ関数は各行が単一のセットであると仮定していますが、セットのリストで操作しています。 2つのaddは何も返しません(a = set(); b = a.add('1'); print b)を試してください)。最初のコンバイナ関数はNoneのリストを返します。これを修正するには、最初のコンバイナ関数を非匿名にして、両方のセットをリストのリストにループさせます:

def set_plus_row(sets, row): 
    for i in range(len(sets)): 
     sets[i].add(row[i]) 
    return sets 


unique_values_per_column = data.aggregate(
    empty_sets, 
    set_plus_row, # can't be lambda b/c add doesn't return anything 
    lambda x, y: [a.union(b) for a, b in zip(x, y)] 
) 

私はzipはScalaで何をするかわからないんだけど、Pythonで、それは二つのリストを取り、一度に2つのリストをループすることができますので、タプル(x = [1, 2, 3]; y = ['a', 'b', 'c']; print zip(x, y);を試してみてください)に合わせて、それぞれ対応する要素を置きます。