0

私はスパーク1.6を使用しています。データフレーム内でルックアップを実装する方法を知りたいと思います。スパークデータフレームのルックアップ

私は従業員&という2つのデータフレームを持っています。

  • 従業員DATAFRAME

    ------------------- 
    Emp Id | Emp Name 
    ------------------ 
    1 | john 
    2 | David 
    
  • 部門DATAFRAME

    -------------------- 
    Dept Id | Dept Name | Emp Id 
    ----------------------------- 
    1 | Admin | 1 
    2 | HR | 2 
    

私は、部門テーブルに従業員表からEMP IDを検索し、DEPT名を取得したいと思います。したがって、結果セットは

Emp Id | Dept Name 
------------------- 
1 | Admin 
2 | HR 

です。SPARKでこのルックアップUDF機能を実装するにはどうすればよいですか。私は両方のデータフレームでJOINを使用したくありません。

+0

を使用することができますあなたが必要なものは、二つのデータフレームを「参加」されています... 1が非常にある場合小さい場合は、ブロードキャスト結合を使用します。 –

+0

これまでのコード例はありますか? –

+0

これは、「結合」が意味することです。何か他の方法でこれを実装しようとする理由は何ですか? – maasg

答えて

4

コメントで既に述べたように、データフレームに参加することは、方法です。

ルックアップを使用できますが、ルックアップテーブルをドライバメモリに収集する必要がある、「分散型」ソリューションはないと思います。

import org.apache.spark.sql.functions._ 
import sqlContext.implicits._ 
import scala.collection.Map 

val emp = Seq((1,"John"),(2,"David")) 
val deps = Seq((1,"Admin",1),(2,"HR",2)) 

val empRdd = sc.parallelize(emp) 
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID") 


val lookupMap = empRdd.collectAsMap() 
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID)) 

val combinedDF = depsDF 
    .withColumn("empNames",lookup(lookupMap)($"EmpID")) 

私の最初に考えたのはUDFにempRddを渡し、PairRDDに定義されたlookupメソッドを使用していたが、あなたは火花を持つことができないので、これは当然のことながら動作しません。また、このアプローチは、EmpIDをが一意であることを前提としています変換(すなわち、UDF)内のアクション(すなわち、lookup)。

EDIT:

あなたempDfは、複数の列(例えば名前、年齢)を持っている場合、あなたはこの

val empRdd = empDf.rdd.map{row => 
     (row.getInt(0),(row.getString(1),row.getInt(2)))} 


    val lookupMap = empRdd.collectAsMap() 
    def lookup(lookupMap:Map[Int,(String,Int)]) = 
     udf((empID:Int) => lookupMap.lift(empID)) 

    depsDF 
     .withColumn("lookup",lookup(lookupMap)($"EmpID")) 
     .withColumn("empName",$"lookup._1") 
     .withColumn("empAge",$"lookup._2") 
     .drop($"lookup") 
     .show() 
+0

ありがとうございます。あなたの例では、empRddはデータフレームではありません。 collectAsMap関数を使用するには、どのようにデータフレームをrddに変換しますか? – Prasan

+0

@Prasanデータフレームに 'rdd'メソッドがあるので、' val empRdd = empDf.rdd.map(row =>(row.getInt(0)、row.getString(1)))のようなものが必要です。 ' –

+0

うん、しかしcollectAsMapはRDDのメンバーではありません。 collectAsMapの代わりにcollectを使ってみると、lookup関数では使用できない配列が得られます。 – Prasan

2

あなたは既に次の手順を実行しデータフレームは、その非常に簡単持っていると言っているとおり:

1)sqlcontext

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

2を作成します)、すべての3例:用の一時テーブルを作成します。

EmployeeDataframe.createOrReplaceTempView("EmpTable") 

3)MySQLクエリを使用したクエリ

val MatchingDetails = sqlContext.sql("SELECT DISTINCT E.EmpID, DeptName FROM EmpTable E inner join DeptTable G on " + 
    "E.EmpID=g.EmpID") 
+0

応答をありがとう。私の記事で述べたように、私はJOINを使いたくありません。 – Prasan

+0

目的を果たしているときになぜあなたはjoinを使用しません。 – toofrellik

+0

これは私が投稿した例です。リアルタイムのシナリオでは、複雑なInformaticaマッピングをSPARKに変換しようとしています。私はSPARKでルックアップ機能(informaticaのようなjus)を複製しようとしています。 – Prasan

関連する問題