2016-12-02 11 views
-3

私は2つのrddを持っています。例えば、は、pysparkの2つのrddの各値を比較します。

employee = [(31, ['Raffery', 31, 'a', 'b']), 
       (33, ['Jones', 33, '1', 'b']), 
       (32, ['Heisenberg', 33, 'a', 'b']), 
       (37, ['Robinson', 34, 'c', 'cc']), 
       (38, ['Smith', 34, 'a', 'b'])]` 

department = [[(31, ['Raffery', 31, 'c', 'b']), 
       (33, ['Jones', 33, 'a', 'b']), 
       (34, ['Heisenberg', 33, 'a', 'b'])]` 

私は各キーの二番目と最初のRDDの要素を比較したい:

出力は

31のようになりますと、故障が電子である[1 ] [2]

33と障害がeである[1] [2]

答えて

1

私は、出力は正確にその形式であることが必要であるか厳格わからないんだけど、次はあなたに、ほぼすべての方法のを取得する必要があります。

使用pysparkのデータフレーム:

>>> employee = spark.createDataFrame([(31, ['Raffery', 31, 'a', 'b']), (33, ['Jones', 33, '1', 'b']), (32, ['Heisenberg', 33, 'a', 'b'])], ["id_e", "list_e"]) 
>>> employee.show() 
+----+----------------------+ 
|id_e|list_e    | 
+----+----------------------+ 
|31 |[Raffery, 31, a, b] | 
|33 |[Jones, 33, 1, b]  | 
|32 |[Heisenberg, 33, a, b]| 
+----+----------------------+ 

>>> department = spark.createDataFrame([(31, ['Raffery', 31, 'c', 'b']), (33, ['Jones', 33, 'a', 'b']), (34, ['Heisenberg', 33, 'a', 'b'])], ["id_d", "list_d"]) 
>>> department.show() 
+----+----------------------+ 
|id_d|list_d    | 
+----+----------------------+ 
|31 |[Raffery, 31, c, b] | 
|33 |[Jones, 33, a, b]  | 
|34 |[Heisenberg, 33, a, b]| 
+----+----------------------+ 

は、何にこれらの参加します

>>> joined.rdd.map(lambda row: (row.id_e, [i for i in range(4) if row.list_d[i] != row.list_e[i]])).collect() 
[(31, [2]), (33, [2])] 

>>> joined = employee.join(department, employee.id_e == department.id_d) 
>>> joined.show() 
+----+-------------------+----+-------------------+ 
|id_e|    list_e|id_d|    list_d| 
+----+-------------------+----+-------------------+ 
| 31|[Raffery, 31, a, b]| 31|[Raffery, 31, c, b]| 
| 33| [Jones, 33, 1, b]| 33| [Jones, 33, a, b]| 
+----+-------------------+----+-------------------+ 

が次にデータフレームの間で共有されていない要素のユーザーリストのインデックスをマッピングする:私はユーザIDであり、仮定します

あなたの道に幸運をもたらすことを望みますように。

関連する問題