1
後で処理するためにしばらくの間、バッチのデータを蓄積する必要があります。私はSpark 1.6.3を使用しています。
私はそれらをフォーム(tag, [[time, value],..]
に蓄積する必要があります。 は、これまでのところ私はupdateStateByKey
を試してみました:pysparkにバッチのデータを保存する
time = [0]
def updateFunc(new_values, last_sum,time):
time[0] += 5
if time == 10:
time = 0
return None
return (last_sum or []) + new_values
data = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, ['t','t1'])) \
.updateStateByKey(lambda x,y :updateFunc(x,y,time))
data.pprint()
データが追加されています。しかし、10秒後にデータをフラッシュしようとしませんでした。また、私はwindow
を使用しようとしました
(私はそれを間違った方法を行っています):
はdata= lines.flatMap(lambda lime: line.split(' ')\
.map(lambda tag: (tag: ['time', 'value']))\
.window(10, 2)\
.reduceByKey(lambda x,y : y + x)`
しかし、これは一次元の長いリストを生成します。それは役に立たない。 すべてのリード?ありがとうございました。