2016-12-15 3 views
0

これは可能かどうかにかかわらず、単純なカウントでいくつかのキーについてすべてのMicrobatchesの最新のものを表示するかどうかは不明です。標準化されたスパークストリーミングは、適切な方法で結合または結合しないと!現在の処理が処理すべきキーをトップ5に持たないことを考慮に入れて、保存された/保持されたデータのセットを有する。SPARK現在のMicrobatchの新しいデータがない場合、前回のMicrobatchのupdateStateByKey値

I.最初のMicrobatchの現在の処理のトップ5がx、y、z、a、b、および、である場合、次のMicrobatchがx、c、mをデータとしてのみ有する場合、a、b、yおよびcとmが上位5の点でこれらのプリオーダーよりも少ない場合は、上位5の一部としてzを使用しますか?

悪い使用例かもしれません。

+0

後ろ向きのドキュメントは不明です。 – thebluephantom

答えて

1
rdd1 = sc.parallelize(list('abcd')).map(lambda x: (x, 110 - ord(x))) 
rdd2 = sc.parallelize(list('cdef')).map(lambda x: (x, 2)) 

rddQueue = ssc.queueStream([rdd1, rdd2]) 


def func(new_values, old_value): 
    return sum(new_values) + (old_value or 0) 


rddQueue = rddQueue.updateStateByKey(func).transform(lambda x: x.sortBy(lambda y: y[1], ascending=False)) 

rddQueue.pprint() 

出力:

-------------------------------------------          
Time: 2016-12-16 11:06:54 
------------------------------------------- 
('a', 13) 
('b', 12) 
('c', 11) 
('d', 10) 

-------------------------------------------          
Time: 2016-12-16 11:06:57 
------------------------------------------- 
('a', 13) 
('c', 13) 
('b', 12) 
('d', 12) 
('f', 2) 
('e', 2) 

'取得' とは何を意味?

+0

私はこれについて考える必要があります。 – thebluephantom

+0

問題は概念的には簡単ですが、上記のことが実際に連続したマイクロバッチ/ウィンドウで更新される典型的なDstreamにどのように関係しているかはわかりません。私はこのリンクhttps://www.rittmanmead.com/blog/2015/08/combining-spark-streaming-and-data-frames-for-near-real-time-log-analysis/を見つけ、いくつかの印象を受ける永続化が必要な場合があります。私は熟練した選手だが、Sp Strで合理的に新しいので、万全なコードを提供することはできますか? – thebluephantom

+0

よく、私は 'a'、 'b'、c '、' d 'とは対照的に' abcd 'で私をテストしようとしているのではないかと思います – thebluephantom

0

reduceByKeyAndWindowを使用します。それを除いて絶対確実ではありません。データストアへの永続化が行われない限り、スライディングウィンドウは、スライディングウィンドウよりも長い期間にわたってトップNを失う可能性があります。 おそらくそうではありません。

+0

私はここで混乱しているかもしれません。 – thebluephantom

関連する問題