非常に長い(数十億行)とやや広い(数百列)RDDを持っています。私は各列に一意の値のセットを作成したい(これらのセットは列ごとに500個を超えるユニークな値を含まないため、これらのセットを並列化する必要はありません)。私はここでやっていることは、空のセットのリスト、私のRDD内の各列の1をinitateしようとしているPySparkのRDDの各列に異なる値を見つける
data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))
:ここ
は、私がこれまで持っているものです。集計の最初の部分については、行ごとにdata
を繰り返し、列の
n
の値を
n
番目のセットのセットに追加します。値がすでに存在する場合は、何もしません。その後、後でセットの
union
を実行するので、すべてのパーティションにわたって個別の値のみが返されます。
私はこのコードを実行しようとすると、私は次のエラーを取得する:
AttributeError: 'list' object has no attribute 'add'
私は問題は、私は正確にそれを明確に私が(セットのリストを反復処理していますことを行うわけではないということであると信じてempty_sets
)、各行の列をdata
に繰り返しています。私は(lambda a, b: a.add(b))
がa
がempty_sets
であり、b
がdata.first()
(単一の値ではない)であると信じています。これは明らかに機能せず、意図した集約ではありません。
私のリストとデータフレームの各行を通じて、各値を対応するセットオブジェクトに追加するにはどうすればよいですか?所望の出力は次のようになり
:
[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]
私は私のユースケースに非常に類似している、この例here見てきたPS(私が使用するアイデアを得たところ、それはです最初はaggregate
)。しかし、コードをPySparkに変換するのは非常に難しいですが、私はcase
とzip
のコードが何をしているのかはっきりしていません。