2017-12-08 9 views
0

関連する(しかし同じではない)キーの値の違いを調べようとしています。 - との違いを取得scala/sparkの異なるキーの値を比較する

(“John_1”,[“a”,”b”,”c”]) 
(“John_2”,[“a”,”b”]) 
(“John_3”,[”b”,”c”]) 
(“Mary_5”,[“a”,”d”]) 
(“John_5”,[“c”,”d”,”e”]) 

私は名前に名_番号の内容を比較したい_(1#):たとえば、私は、次のマップを持っていることを言うことができます。したがって、上記の例のために、私は(取得したいと思い例:

(“John_1”,[“a”,”b”,”c”]) //Since there is no John_0, all of the contents are new, so I keep them all 
(“John_2”,[]) //Since all of the contents of John_2 appear in John_1, the resulting list is empty (for now, I don’t care about what happened to “c” 
(“John_3”,[”c”]) //In this case, “c” is a new item (because I don’t care whether it existed prior to John_2). Again, I don’t care what happened to “a”. 
(“Mary_5”,[“a”,”d”]) //There is no Mary_4 so all the items are kept 
(“John_5”,[“c”,”d”,”e”]) //There is no John_4 so all the items are kept. 

私はaggregateByKeyのいくつかの種類をやって、その後、ちょうどリストの違いを見つけるに考えていたが、私が作る方法がわかりません私は名前を持つ、すなわち名_番号_、気にキーの間の一致(# - 1)

答えて

0

次のように私は私の問題を解決するために管理: まず現在のキー

def getPrevKey(k: String): String = { 
    val (n, h) = k.split(“_”) 
    val i = h.toInt 

    val sb = new StringBuilder 
    sb.append(n).append(“_”).append(i-1) 

    return sb.toString 
} 
から以前のキーを計算する関数を作成します

次に、シフトキーを使用してRDDのコピーを作成します。

val copyRdd = myRdd.map(row => { 
    val k1 = row._1 
    val v1 = row._2 

    val k2 = getPrevHour(k1) 
    (k2,v1) 
}) 

そして最後に、私は労働組合の両方RDDSとリストの間の差をとることによって、キーによって減らす:

val result = myRdd.union(copyRdd) 
    .reduceByKey(_.diff(_)) 

をこれは私が必要な正確な結果を取得しますが、それは、多くのを必要とするという問題があります組合のための記憶。最終的な結果はそれほど大きいわけではありませんが、部分的な結果は実際にプロセスの重さを左右します。

0

スプリット "ID":。

import org.apache.spark.sql.functions._ 

val df = Seq(
    ("John_1", Seq("a","b","c")), ("John_2", Seq("a","b")), 
    ("John_3", Seq("b","c")), ("Mary_5", Seq("a","d")), 
    ("John_5", Seq("c","d","e")) 
).toDF("key", "values").withColumn(
    "user", split($"key", "_")(0) 
).withColumn("id", split($"key", "_")(1).cast("long")) 

追加ウィンドウ:

val w = org.apache.spark.sql.expressions.Window 
    .partitionBy($"user").orderBy($"id") 

udf

val diff = udf((x: Seq[String], y: Seq[String]) => y.diff(x) 

と計算:

df 
    .withColumn("is_previous", coalesce($"id" - lag($"id", 1).over(w) === 1, lit(false))) 
    .withColumn("diff", when($"is_previous", diff(lag($"values", 1).over(w), $"values")).otherwise($"values")) 
    .show 

// +------+---------+----+---+-----------+---------+        
// | key| values|user| id|is_previous|  diff| 
// +------+---------+----+---+-----------+---------+ 
// |Mary_5| [a, d]|Mary| 5|  false| [a, d]| 
// |John_1|[a, b, c]|John| 1|  false|[a, b, c]| 
// |John_2| [a, b]|John| 2|  true|  []| 
// |John_3| [b, c]|John| 3|  true|  [c]| 
// |John_5|[c, d, e]|John| 5|  false|[c, d, e]| 
// +------+---------+----+---+-----------+---------+ 
+0

返信いただきありがとうございます。私は自分のデータが約2TBのRDDであると付け加えておきます。そうだとすれば、私は合体を使うことができないと思うか、それとも並列処理を打ち消すだろうと思いますよね? また、RDDは.toDFメソッドを持っていないようですが、それは私が使用できる別の同等のものですか? – jfmancilla

+0

これはSQL結合であり、 'RDD.coalesce'や' DataFrame.coalesce'ではありません。ウィンドウ関数はシャッフルされますが、私はそれについて多くを行うことはできません。 – user8371915

関連する問題