2016-11-18 14 views
2

Iペア-RDD構造とを有する。 [(キー、[(timestring、値)]]Pyspark:ネストされたリストにマージ値

例:

[("key1", [("20161101", 23), ("20161101", 41), ("20161102", 66),...]), 
("key2", [("20161101", 86), ("20161101", 9), ("20161102", 11),...]) 
    ...] 

私がしたいですtimestringによってグループ化各キーのプロセスのリストを、同一timestringsのすべての値の平均値を算出し、上記の例ではなるので:。

[("key1", [("20161101", 32), ..]), 
("key2", [("20161101", 47.5),...]) 
    ...] 

私は使用して解決策を見つけるのに苦労1つのステップでPysparkのメソッドは、すべて可能ですか、いくつかの中間ステップを使用する必要がありますか?

答えて

1

あなたは関数を定義することができます

from itertools import groupby 
import numpy as np 

def mapper(xs): 
    return [(k, np.mean([v[1] for v in vs])) for k, vs in groupby(sorted(xs), lambda x: x[0])] 

そしてmapValues

rdd = sc.parallelize([ 
    ("key1", [("20161101", 23), ("20161101", 41), ("20161102", 66)]), 
    ("key2", [("20161101", 86), ("20161101", 9), ("20161102", 11)]) 
]) 

rdd.mapValues(mapper) 
関連する問題