2017-12-16 10 views
2

Spark 2でこのような問題を解決しようとしていますが、解決策が見つかりません。 Spark - データフレームで定義されたルールを別のデータフレームに適用する方法

私は データフレームを持っている

+----+-------+------+ 
|id |COUNTRY| MONTH| 
+----+-------+------+ 
| 1 | US | 1 | 
| 2 | FR | 1 | 
| 4 | DE | 1 | 
| 5 | DE | 2 | 
| 3 | DE | 3 | 
+----+-------+------+ 

そしてデータフレームB:

+-------+------+------+ 
|COLUMN |VALUE | PRIO | 
+-------+------+------+ 
|COUNTRY| US | 5 | 
|COUNTRY| FR | 15 | 
|MONTH | 3 | 2 | 
+-------+------+------+ 

アイデアをするために、データフレームAにデータフレームBの "ルール" を適用することですこの結果を得る:

データフレームA '

+----+-------+------+------+ 
|id |COUNTRY| MONTH| PRIO | 
+----+-------+------+------+ 
| 1 | US | 1 | 5 | 
| 2 | FR | 1 | 15 | 
| 4 | DE | 1 | 20 | 
| 5 | DE | 2 | 20 | 
| 3 | DE | 3 | 2 | 
+----+-------+------+------+ 

私はそのようないろいろ書いてみました:

dfB.collect.foreach(r => 
    var dfAp = dfA.where(r.getAs("COLUMN") == r.getAs("VALUE")) 
    dfAp.withColumn("PRIO", lit(r.getAs("PRIO"))) 
) 

をしかし、私はそれが正しい方法ではないと確信しています。

スパークでこの問題を解決するための戦略は何ですか?

答えて

3

ルールセットがかなり小さい(最悪の場合のシナリオでは、データのサイズと生成される式のサイズが考えられる可能性があると仮定して)can crash the planner)最も簡単な解決策は、ローカルコレクションとマップそのSQL式に:

import org.apache.spark.sql.functions.{coalesce, col, lit, when} 

val df = Seq(
    (1, "US", "1"), (2, "FR", "1"), (4, "DE", "1"), 
    (5, "DE", "2"), (3, "DE", "3") 
).toDF("id", "COUNTRY", "MONTH") 

val rules = Seq(
    ("COUNTRY", "US", 5), ("COUNTRY", "FR", 15), ("MONTH", "3", 2) 
).toDF("COLUMN", "VALUE", "PRIO") 


val prio = coalesce(rules.as[(String, String, Int)].collect.map { 
    case (c, v, p) => when(col(c) === v, p) 
} :+ lit(20): _*) 

df.withColumn("PRIO", prio) 
+---+-------+-----+----+ 
| id|COUNTRY|MONTH|PRIO| 
+---+-------+-----+----+ 
| 1|  US| 1| 5| 
| 2|  FR| 1| 15| 
| 4|  DE| 1| 20| 
| 5|  DE| 2| 20| 
| 3|  DE| 3| 2| 
+---+-------+-----+----+ 

あなたは、それぞれ最小または最大のマッチング値を適用するleastまたはgreatestで​​3210を置き換えることができます。あなたはできるルールの大きなセットで

  • melt dataは長い形式に変換します。

    val dfLong = df.melt(Seq("id"), df.columns.tail, "COLUMN", "VALUE") 
    
  • joinの列と値。 (例えばminための)適切な集計機能付きidによってPRIOR集計

  • val priorities = dfLong.join(rules, Seq("COLUMN", "VALUE")) 
        .groupBy("id") 
        .agg(min("PRIO").alias("PRIO")) 
    
  • アウターidによってdfと出力を結合。

    df.join(priorities, Seq("id"), "leftouter").na.fill(20) 
    
    +---+-------+-----+----+ 
    | id|COUNTRY|MONTH|PRIO| 
    +---+-------+-----+----+ 
    | 1|  US| 1| 5| 
    | 2|  FR| 1| 15| 
    | 4|  DE| 1| 20| 
    | 5|  DE| 2| 20| 
    | 3|  DE| 3| 2| 
    +---+-------+-----+----+ 
    
0

dataframeBのルールとさせて頂きます、私はUDF

val code = udf{(x:String,y:Int)=>if(x=="US") "5" else if (x=="FR") "15" else if (y==3) "2" else "20"} 

df.withColumn("PRIO",code($"COUNTRY",$"MONTH")).show() 
を使用することにより、テーブルの下

+---+-------+------+ 
| id|COUNTRY|MONTH| 
+---+-------+------+ 
| 1|  US|  1| 
| 2|  FR|  1| 
| 4|  DE|  1| 
| 5|  DE|  2| 
| 3|  DE|  3| 
+---+-------+------+ 

のためのデータフレーム "DF" を作成しました

限られています

出力

+---+-------+------+----+ 
| id|COUNTRY|MONTH|PRIO| 
+---+-------+------+----+ 
| 1|  US|  1| 5| 
| 2|  FR|  1| 15| 
| 4|  DE|  1| 20| 
| 5|  DE|  2| 20| 
| 3|  DE|  3| 2| 
+---+-------+------+----+ 
関連する問題