2016-06-15 14 views
0

は、Spark StreamingのreduceByKeyAndWindow関数に関する質問があります。これは素朴かもしれませんが、ScalaとSparkの両方にとって新しいものです。 Spark StreamingのreduceByKeyAndWindowを複数の値で使用

を実行する方法はありカウント(ロング): 列1:キー、 コラム2:金額(ロング)、 3列

は私のデータを解析した後、私は3つの列と、このデータセットを持っていますreduceByKeyAndWindow関数を使用して、指定されたウィンドウ上のColumn 2(Amount)とColumn 3(Count)の合計を取得しますか?私が読んだものから、

、reduceByKeyAndWindow機能のように見えるので、私が現在やっていることは二つのデータセットを作成して、2つの列のみ(キー、値)を見込ん: データが1を設定:キー、& & 金額をデータセット2:キー

次に、reduceByKeyAndWindowを2回呼び出しています。 1回だけ行うことができれば素晴らしいと思います(減らすパラメータは量とカウントの両方の列と同じです)

この質問は意味がありますか?どんな入力も素晴らしいでしょう!

答えて

4

簡単にできます。あなたは(key、(v1、v2))にマップし、reduceByKeyAndWindowのために以下のような単純な関数を使用しなければなりません。

val data = Array(("key1",(1L,2L)),("key1",(9L,1L)),("key2",(1L,2L)),("key2",(99L,11L))) 

val rdd = sc.parallelize(k) 

val aggregateFunc: ((Long, Long), (Long, Long)) => (Long, Long) = { 
     case ((v1, w1), (v2, w2)) => { 
     (v1 + v2, w1 + w2) 
     } 
    } 
rdd.reduceByKey(aggregateFunc).collect() 
// Array((key1,(10,3)), (key2,(100,13))) 

上記の機能はreduceByKeyAndWindow

+0

ThxをKnight71のために動作します!これはうまくいった。追加する関数と減算する関数の2つの関数を作成します(ウィンドウ外にあるバッチの場合)。 reduceByKeyAndWindow(reduceByKeyAndWindow(aggregateFuncAdd、aggregateFuncSubtract、Minutes(windowLength.toLong)、Seconds(slideInterval.toLong)、2)関数をreduceByKeyAndWindowに追加するのは簡単でした。 – Shay

0

多分、私はここで範囲外に出ていますが、あなたのものに似たケースのために私がしたことがあります。 Java擬似コード:

//inside your pipeline 
private class AmountCountClass{ 
    private int amount; 
    private int count; 
    public AmountCountClass(int i,int j){ 
     amount=i; 
     count=j; 
    //various methods: get-set, increase-decrease,whatever you need 
} 
JavaPairDStream<Key,AmountCountClass> pairedAndReducedByKey= yourDStream.mapToPair({ 
    return new Tuple2<yourKey, new AmountCountClass(initialAmount,initialCount); 
}).reduceByKeyAndWindow(windowTime,windowSlide, 
    reduceFunction(AmountCountClass arg0, arg1){ 
     arg0.sumAmount(arg1.getAmount()); 
     arg0.sumCount(arg1.getCount()); 
     return arg0; 
}); 

非常にきれいで読みやすい。

関連する問題