2016-04-16 3 views
1

私は1つのキーに対して複数の値(リスト)を持つrddを持っています。キーの各値からガベージをフィルタリングしたいと思います。1つのキーpysparkの複数の値のマップ

RDDは、このデータ

((key1, [('',val1),('', val2),..]),(key2,[...) 

は、私はマップ機能が、ここで必要とされて知っているが、私は反対複数の値のためにマップを使用していないが、この

((key1,[val1, val2,...]), key2[...) 

のようなものにそれをマップするましたかぎ。

これはこれを行うための私の努力です。

def mapper(x): 
    values = [] 
    for a in x[1]: 
     values.append(a[1]) 
    return(x[0], ap) 
listRdd.map(mapper).collect() 

が、私はいくつかのエラー

答えて

1

を取得する主なアイデアは、単一のコレクションとしてRDDの各エントリのプロセスとしてそれを考慮することです。意味、我々が期待される出力には、このコレクションを処理するために、次のエントリ

entry = ("key1", [('',"val1"),('',"val2")]) 

を考えると、我々はコレクション

entry[0] 
# 'key1' 

entry[1] 
# [('', 'val1'), ('', 'val2')] 

の構造を理解する必要があり、今のこの第二部では機能してみましょう:

map(lambda x : x[1],entry[1]) 
# ['val1', 'val2'] 

入力を入力とし、結果の出力は(キー、[値...])タプルになる関数を定義できるようになりました。我々はそれをmapperと呼ぶでしょう。 rddのすべてのエントリにマッパーを適用できます。一緒にコードを置く

def mapper(entry): 
    return (entry[0],map(lambda x : x[1],entry[1])) 

data = [("key1", [('',"val1"),('',"val2")]),("key2",[('',"val3"),('',"val2"),('',"val4")])] 

rdd = sc.parallelize(data) 

rdd2 = rdd.map(lambda x : mapper(x)) 

rdd2.collect() 
# [('key1', ['val1', 'val2']), ('key2', ['val3', 'val2', 'val4'])] 
+1

は、私はまったく同じでしたが、マップ機能に構文エラーがありました、あなたは歓迎されている –

+0

かかわら説明してくれてありがとう! – eliasah

関連する問題