2016-05-24 8 views
1

私は2次RDDSありますleftOuterJoinで空のデフォルト値を初期化することはできますか?

name_to_hour = sc.parallelize([("Amy", [7,8,7,18,19]), ("Dan", [6,7]), ("Emily", [1,2,3,7,7,7,2])]) 

name_biz = sc.parallelize(["Amy", "Brian", "Chris", "Dan", "Emily"]) 

をし、私の結果RDDは、次のようになりますので、私は、それらに参加したい:

[('Amy', [7, 8, 7, 18, 19]), ('Chris', []), ('Brian', []), ('Dan', [6, 7]), ('Emily', [1, 2, 3, 7, 7, 7, 2])] 

私は不器用な解決策と思われるものとそれを達成することができます:

from pyspark import SparkContext 

sc = SparkContext() 

name_to_hour = sc.parallelize([("Amy", [7,8,7,18,19]), ("Dan", [6,7]), ("Emily", [1,2,3,7,7,7,2])]) 

name_biz = sc.parallelize(["Amy", "Brian", "Chris", "Dan", "Emily"]) 

temp = name_biz.map(lambda x: (x, [])) 

joined_rdd = temp.leftOuterJoin(name_to_hour) 

def concat(my_tup): 
    if my_tup[1] is None: 
     return [] 
    else: 
     return my_tup[1] 

result_rdd = joined_rdd.map(lambda x: (x[0], concat(x[1]))) 

print "\033[0;34m{}\033[0m".format(result_rdd.collect()) 

もっと良い方法がありますか?

私はそれが何らかの形で非空のフィールドは、彼らがname_to_hour[]の空のgetデフォルト値に持っていたものに保つこと、leftOuterJoinに指定することが可能であったならば、私の問題は、はるかに容易に解決することができることを考えて、私ましたそのような方法があるとは思わない。

答えて

1

この問題にアプローチする方法の1つは、Pythonリストの辞書順を利用することです。空のリストは常にあるので、私たちは単にunionを作り、maxを減らすことができます非空の「未満」:もちろん

temp.union(name_to_hour).reduceByKey(max) 

これは、キーが一意であることを前提としています。

関連する問題