2016-03-29 5 views
0

私の最初の値がidで、2番目がstuffであるスケーリングジョブにrecords:TypedType[(String, util.List[String])]があります。次想像:私は出力に指定したidのために互いに異なるレコードのみを希望ScaldingでList [String]のdiffを生成する

("1", ["a","b","c"]) 
("1", ["a","b","c"]) 
("1", ["a","b","c"]) 
("2", ["a","b"]) 
("2", ["a","b","c"]) 
("3", ["a","b","c"]) 

records.groupBy(_._1)後。上の入力の場合、出力は次のようになります。

("2", ["a","b"]) 
("2", ["a","b","c"]) 

私はスケーリングを初めてとしています。これを達成するためのエレガントな方法は何ですか?

答えて

0

火傷の側面は、あなたにとって非常に重要であるかどうかは知りません(あなたのコレクションは非常に巨大である?)が、プレーン古いScalaで私がやるだろう:

// Given: 
val records = Seq("1" -> List("a", "b", "c"), "1" -> List("a", "b", "c"), "1" -> List("a", "b", "c"), "2" -> List("a", "b"), "2" -> List("a", "b", "c"), "3" -> List("a", "b", "c"), "3" -> List("d") 

val distinctValues = records.groupBy(_._1).map { case (k, v) => k -> v.toSet } 
// => Map(2 -> Set((2,List(a, b)), (2,List(a, b, c))), 1 -> Set((1,List(a, b, c))), 3 -> Set((3,List(a, b, c)), (3,List(d)))) 

val havingMultipleDistinct = distinctValues.map { case (k, v) => v.size > 1 } 
// => Map(2 -> Set((2,List(a, b)), (2,List(a, b, c))), 3 -> Set((3,List(a, b, c)), (3,List(d)))) 

val asRecords = havingMultipleDistinct.values.flatten 
// => List((2,List(a, b)), (2,List(a, b, c)), (3,List(a, b, c)), (3,List(d))) 
+0

これはクラスタ上で実行する必要があります。スケーリングは基本的です – Gevorg

0

それぞれの値の大きさであれば鍵はメモリに収まるほど小さく、そしてこのようなものは、それを行う必要があります。

records 
    .group 
    .toSet 
    .filter(_.size > 1) 
    .flatten 

それが大きすぎる場合は、あなたが自分自身とのパイプを結合することができます。

val grouped = records.group 
grouped 
.join(grouped) 
.collect { case(k, (a, b)) if a != b => k -> a }