2016-11-08 9 views
2

調査を行っているとき、Spark RDDのすべてのサブセットを削除するのはやや難しいことがわかります。spark RDDのサブセットを効率的に削除する方法

データ構造はRDD[(key,set)]です。例えば、それができる:

RDD[ ("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)) ]

マイク(Set(1,3))のセットはピーターの(Set(1,2,3))のサブセットであるので、私は

RDD[ ("peter",Set(1,2,3)), ("jack",Set(5)) ]

で終わるであろう、「マイク」を削除したいです

2つの "for"ループ操作で、Pythonでローカルに実装するのは簡単です。しかし、私がscalaとsparkでクラウドに拡張したい場合、良い解決策を見つけるのは簡単ではありません。

ありがとうございました

+0

がスパークAPIを使用してこれを行うために、我々は、デカルト積を使用して自身でデータを乗算し、結果の行列の各エントリを検証するに頼ることができますか? '(" peter "、Set(1,2,3))'& '(" olga "、Set(1,2,3))' – maasg

+0

ただ一つ削除してください。どちらが重要かは分かりません。 –

+0

提供されたソリューションは、両方を保持します。あなたはあなたの特定のニーズにそれを適応させるために招待されています。 – maasg

答えて

1

。セット間の部分集合演算は反射的ではないため、is "alice" subsetof "bob"is "bob" subsetof "alice"を比較する必要があります。あなたがタイで何をしますか

val data = Seq(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("anne", Set(7)),("jack",Set(5,4,1)), ("lizza", Set(5,1)), ("bart", Set(5,4)), ("maggie", Set(5))) 
// expected result from this dataset = peter, olga, anne, jack 
val userSet = sparkContext.parallelize(data) 
val prod = userSet.cartesian(userSet) 
val subsetMembers = prod.collect{case ((name1, set1), (name2,set2)) if (name1 != name2) && (set2.subsetOf(set1)) && (set1 -- set2).nonEmpty => (name2, set2) } 
val superset = userSet.subtract(subsetMembers)  

// lets see the results: 
superset.collect() 
// Array[(String, scala.collection.immutable.Set[Int])] = Array((olga,Set(1, 2, 3)), (peter,Set(1, 2, 3)), (anne,Set(7)), (jack,Set(5, 4, 1))) 
-3

マップの後にフィルタを使用できます。

削除したいものの値を返すマップのように構築できます。

def filter_mike(line): 
    if line[1] != Set(1,3): 
     return line 
    else: 
     return None 

次に、あなたはこのようになりましフィルタリングすることができます:

your_rdd.map(filter_mike).filter(lambda x: x != None) 

これはこれはRDD.fold機能を使用することによって達成することができ

+0

質問をもう一度読んでください。 – shanmuga

1

に動作します最初の関数を構築します。
この場合、必要な出力はスーパーセットの「リスト」(ItemList)です。このため入力も「リスト」(ITEMLISTのRDD)に変換する必要があり、私たちは(非分散アルゴリズムにおける二重ループに相当)お互いにそれぞれの要素を比較することに脱出することができます疑う

import org.apache.spark.rdd.RDD 

// type alias for convinience 
type Item = Tuple2[String, Set[Int]] 
type ItemList = List[Item] 

// Source RDD 
val lst:RDD[Item] = sc.parallelize(List(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)))) 


// Convert each element as a List. This is needed for using fold function on RDD 
// since the data-type of the parameters are the same as output parameter 
// data-type for fold function 
val listOflst:RDD[ItemList] = lst.map(x => List(x)) 

// for each element in second ItemList 
// - Check if it is not subset of any element in first ItemList and add first 
// - Remove the subset of newly added elements 
def combiner(first:ItemList, second:ItemList) : ItemList = { 
    def helper(lst: ItemList, i:Item) : ItemList = { 
     val isSubset: Boolean = lst.exists(x=> i._2.subsetOf(x._2)) 
     if(isSubset) lst else i :: lst.filterNot(x => x._2.subsetOf(i._2)) 
    } 
    second.foldLeft(first)(helper) 
} 


listOflst.fold(List())(combiner) 
関連する問題