11

私は(sensor_id, timestamp, data)のデータセットを持っています(sensor_idはIoTデバイスのID、タイムスタンプはUNIXタイム、データはその時の出力のMD5ハッシュです)。テーブルには主キーはありませんが、各行は一意です。スパーク:少なくともn個の共通属性を持つペアを見つけますか?

は、私は、彼らが同じタイムスタンプで同じデータを出射さn異なる機会にそれらすなわち間の共通の(timestamp, data)エントリこれら二つのセンサが、少なくともnn=50)を有することsensor_id S s1などs2のすべてのペアを見つける必要があります。

データの大きさの意味では、私は10Bの行と〜50Mの別のsensor_idsを持っており、少なくとも同じタイムスタンプで同じデータを少なくとも50回送信した約5MのセンサーIDがあると私は信じています。

Sparkでこれを行うにはどうすればよいですか?私はさまざまなアプローチ(グループ(timestamp, data)および/または自己結合)を試みましたが、複雑さが非常に高価です。

+3

サンプルデータ、試行されたコード、予想される出力を追加できますか? – mtoto

答えて

0

私の理解が正しければ、私は以下の単純なコードを使用することによって、これを達成することができる午前、

test("Spark: Find pairs having atleast n common attributes"){ 
/** 
    * s1,1210283218710,34 
    s1,1210283218730,24 
    s1,1210283218750,84 
    s1,1210283218780,54 
    s2,1210283218710,34 
    s2,1210283218730,24 
    s2,1210283218750,84 
    s2,1210283218780,54 
    s3,1210283218730,24 
    s3,1210283218750,84 
    s3,1210283218780,54 
    */ 
    val duplicateSensors = sc.textFile("sensor_data") 
    .map(line => line.split(",")).map(ar=>((ar(1),ar(2)),ar(0))) // (ts,val),sid 
    .aggregateByKey(List.empty[String])(_ :+_,_:::_)// grouped(ts,val)(List(n sid)) 
    .flatMapValues(l => l.sorted.combinations(2))// (ts,val)(List(2 sid combination)) 
    .map(_._2).countByValue() // List(s1, s3) -> 3, List(s2, s3) -> 3, List(s1, s2) -> 4 (2sensors, no of common entries) 
    // Now Do the filter .... grater than 50 
    duplicateSensors.foreach(println) 
} 

そして、あなたが共通の属性を持つペアがカウントされます。

+0

これは私が言及したデータセットのサイズでは機能しません。特に、このO(s^2)はsの数である。 – pathikrit

+1

コードまたは他のテキストをイメージとして掲載しないでください。 – m69

+0

画像としてのコードの投稿は非常に悪い考えです。 – Richard

4

これはSparkから抽象化された疑似コードです。あなたはまず自分のデータセットを並べ替えることができます。

select id, timestamp, data order by timestamp, data, id 

代表的な10行:

s1,100,a #1 
s2,100,a #2 
s3,100,a #3 
s4,100,b #4 
s1,101,a #5 
s3,101,b #6 
s4,101,b #7 
s2,101,a #8 
s3,102,b #9 
s4,102,b #10 

今すぐ上から下へ反復、そして限り、タイムスタンプとデータがあるとして、前のエントリと同じエントリのリストを作成します。私たちの例の行1-3フォームそのようなリストで

ので、我々はすでにいくつかの潜在的なペアを参照してください。

s1, s2 
s1, s3 
s2, s3 

行#4が持つ1つだけのエントリである(100、b)は、我々はそれをスキップすることができます。 行#5は(101、a)で1つのエントリのみをスキップできます。

行#6及び#7は、新たなペアである:

s3, s4 

また#9と#10形一対

は一緒にすべてを置く人は簡単ペアを数えることができる。

s1, s2 
s1, s3 
s2, s3 
s3, s4 
s3, s4 

この方法の利点は、ファイルをソートできる場合、ソートされたデータセットを複数の小さなチャンクに分割できることです(チャンクはグループ境界上で分割する必要があります。つまり、#1,2,3は1つのチャンクに入れる必要があります)ペアを計算し、最後のステップとして最後の結果を結合します。

こちらがお役に立てば幸いです。

0

これはどうやってやるの?次のようにデータを処理することができ、今

#!/usr/bin/env python3 
import random 

fout = open('test_data.csv','w') 

i=0 
for x in range(100000): 
    if i>=1000000: 
    break 
    for y in range(random.randint(0,100)): 
    i   = i + 1 
    timestamp = x 
    sensor_id = random.randint(0,50) 
    data  = random.randint(0,1000) 
    fout.write("{} {} {}\n".format(timestamp,sensor_id,data)) 

まず、いくつかの偽のデータを生成します。

あなたが行数がNも、ユニークなタイムスタンプの数がTこと、およびタイムスタンプごとにセンサーの期待数はSも聞かせている場合、各操作の複雑さは、コメントのようです私は一種の後期この回答に取り組んでいますが、ボトルネックが、少なくとも、表示する必要があるとして

import itertools 

#Turn a set into a list of all unique unordered pairs in the set, without 
#including self-pairs 
def Pairs(x): 
    temp = [] 
    x = list(x) 
    for i in range(len(x)): 
    for j in range(i+1,len(x)): 
     temp.append((x[i],x[j])) 
    return temp 

#Load data 
#O(N) time to load data 
fin  = sc.textFile("file:///z/test_data.csv") 
#Split data at spaces, keep only the timestamp and sensorid portions 
#O(N) time to split each line of data 
lines  = fin.map(lambda line: line.split(" ")[0:2]) 
#Convert each line into a timestamp-set pair, where the set contains the sensor 
#O(N) time to make each line into a timestamp-hashset pair 
monosets = lines.map(lambda line: (line[0],set(line[1]))) 
#Combine sets by timestamp to produce a list of timestamps and all sensors at 
#each timestamp 
#O(TS) time to place each line into a hash table of size O(T) where each 
#entry in the hashtable is a hashset combining 
timegroups = sets.reduceByKey(lambda a,b: a | b) 
#Convert sets at each timestamp into a list of all pairs of sensors that took 
#data at that timestamp 
#O(T S^2) time to do all pairs for each timestamp 
shared  = timegroups.flatMap(lambda tg: PairsWithoutSelf(tg[1])) 
#Associate each sensor pair with a value one 
#O(T S^2) time 
monoshared = shared.map(lambda x: (x,1)) 
#Sum by sensor pair 
#O(T S^2) time 
paircounts = monoshared.reduceByKey(lambda a,b: a+b) 
#Filter by high hitters 
#O(<S^2) time 
good  = paircounts.filter(lambda x: x[1]>5) 
#Display results 
good.count() 

時間の複雑さは、少し手で波打っています。

関連する問題