1
私は2次RDDSありますleftOuterJoinで空のデフォルト値を初期化することはできますか?
name_to_hour = sc.parallelize([("Amy", [7,8,7,18,19]), ("Dan", [6,7]), ("Emily", [1,2,3,7,7,7,2])])
name_biz = sc.parallelize(["Amy", "Brian", "Chris", "Dan", "Emily"])
をし、私の結果RDDは、次のようになりますので、私は、それらに参加したい:
[('Amy', [7, 8, 7, 18, 19]), ('Chris', []), ('Brian', []), ('Dan', [6, 7]), ('Emily', [1, 2, 3, 7, 7, 7, 2])]
私は不器用な解決策と思われるものとそれを達成することができます:
from pyspark import SparkContext
sc = SparkContext()
name_to_hour = sc.parallelize([("Amy", [7,8,7,18,19]), ("Dan", [6,7]), ("Emily", [1,2,3,7,7,7,2])])
name_biz = sc.parallelize(["Amy", "Brian", "Chris", "Dan", "Emily"])
temp = name_biz.map(lambda x: (x, []))
joined_rdd = temp.leftOuterJoin(name_to_hour)
def concat(my_tup):
if my_tup[1] is None:
return []
else:
return my_tup[1]
result_rdd = joined_rdd.map(lambda x: (x[0], concat(x[1])))
print "\033[0;34m{}\033[0m".format(result_rdd.collect())
もっと良い方法がありますか?
私はそれが何らかの形で非空のフィールドは、彼らがname_to_hour
と[]
の空のgetデフォルト値に持っていたものに保つこと、leftOuterJoin
に指定することが可能であったならば、私の問題は、はるかに容易に解決することができることを考えて、私ましたそのような方法があるとは思わない。