1

私はVideoID-UserIDのキーと値のペアを持つdstreamを持っていますが、VideoIDによって異なるUserIDグループをカウントするのがよいでしょうか?上記のようにSpark Streaming - 状態の別の要素をカウントする

// VideoID,UserID 
foo,1 
foo,2 
bar,1 
bar,2 
foo,1 
bar,2 

、私はいつでも冗長foo,1bar,2を除去することにより、動画ID-CountUserIDを取得したいので、結果は次のようになります。つまり

foo: 2 
bar: 2 

、私は大きなを保持したいですメモリ内の状態データセット。新しいバッチのdstreamが到着すると、それをデータセットと比較して、すべてのビデオの異なるユーザーを数えます。

どうすればよいですか?

私はSpark 1.6で作業していますが、それ以降のバージョンの回答は受け入れられます。可能であればPythonコード。

答えて

0

ビデオIDでグループ化されたユーザーIDの別個のカウントを取得するには、aggregateByKeyを使用することを検討してください。申し訳ありませんが、これはScalaなので翻訳する必要があります。

val rdd = sc.textFile("your_file.txt") 

val initialSet = Set.empty[Int] 
val addToSet = (s: Set[Int], v:Int) => s + v 
val mergeSets = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2 

val distinctValSets = rdd.aggregateByKey(initialSet)(addToSet, mergeSets) 
val distinctValCountd = rdd.map({case(k,s) => (k,s.size)}) 

初期設定はaddToSetとmergeSetsがあなたのセットに値を追加し、キーに基づいて異なるセットをマージする方法を指定し、あなたの集約オブジェクトの初期値です。これにより、各動画に関連付けられたユーザー数が異なり、reduceByKeyやgroupByKeyよりも安価(スペース単位)です。

+0

お世辞になりました!今では、大きなデータセットをメモリに保持するのは良い方法ではないと思うので、ウィンドウ関数を使用して、代わりに期間内の別のものを数えます。ありがとうございました! –

0
val rdd1 = sc.parallelize(Seq(("foo", 1),("foo", 2),("foo", 1))) 
    rdd1.groupByKey.mapValues(x=>x.toSet.toSeq).flatMapValues(x=>x).collect 
関連する問題