2016-12-01 19 views
1

後で処理するためにしばらくの間、バッチのデータを蓄積する必要があります。私はSpark 1.6.3を使用しています。
私はそれらをフォーム(tag, [[time, value],..]に蓄積する必要があります。 は、これまでのところ私はupdateStateByKeyを試してみました:pysparkにバッチのデータを保存する

time = [0] 
def updateFunc(new_values, last_sum,time): 
    time[0] += 5 
    if time == 10: 
     time = 0 
     return None 
    return (last_sum or []) + new_values 

data = lines.flatMap(lambda line: line.split(" "))\ 
        .map(lambda word: (word, ['t','t1'])) \ 
        .updateStateByKey(lambda x,y :updateFunc(x,y,time)) 
data.pprint() 

データが追加されています。しかし、10秒後にデータをフラッシュしようとしませんでした。また、私はwindowを使用しようとしました

(私はそれを間違った方法を行っています):

data= lines.flatMap(lambda lime: line.split(' ')\ 
    .map(lambda tag: (tag: ['time', 'value']))\ 
    .window(10, 2)\ 
    .reduceByKey(lambda x,y : y + x)` 


しかし、これは一次元の長いリストを生成します。それは役に立たない。 すべてのリード?ありがとうございました。

答えて

0
items = lines.flatMap(lambda x: list(x)).map(lambda x: (x, [('time', 'value')])) 
counts = items.reduceByKeyAndWindow(lambda x, y: x + y, invFunc=None, windowDuration=3, slideDuration=2) 

この

をお試しください
関連する問題