2017-10-02 13 views
0

私はスパーク(時間)のために新しく、さらにScalaでは経験の浅いです。しかし、私は両方のことにもっと慣れ親しんでくれることを切望してきました。スパーク結合アレイ

私はかなり軽いタクを持っています。私は2つのJSONファイルからインポートする2つのデータフレームを持っています。 1つはuuid,text,tag_idsと、もう1つはタグid,termです。uuid、text、tag_ids、tag_termsを含むsolrにインポートできる新しいjsonファイルを作成したいと思います。

val text = spark.sqlContext.jsonFile("/tmp/text.js") 
val tags = spark.sqlContext.jsonFile("/tmp/tags.js") 



text.printSchema() 

root 
| -- uuid: string (nullable = true) 
| -- tag_ids: array (nullable = true) 
| | -- element: string (contiansNull = true) 
| -- text: string (nullable = true) 

tags.printSchema() 
root 
| -- id: string (nullable = true) 
| -- term: string (nullable = true) 


#desired output 
+--------------------+------+---------+------------+ 
|    uuid| text | tag_ids | tag_terms| 
+--------------------+------+---------+------------+  
|cf5c1f4c-96e6-4ca...| foo | [1,2]| [tag1,tag2]|  
|c9834e2e-0f04-486...| bar | [2,3]| [tag2,tag3]| 
+--------------------+--------------+--------------+ 

私が試したことをすべて示すのは難しいです。本質的に.join()は配列であるtag_idsに問題があります。私はexplode()tag_idsに参加してtag_termsに参加することができますが、それを新しいdfに再組み立てしてもまだ私のレベルを超えています。 explodeを使用して

答えて

1

ソリューション:

val result = text 
    .withColumn("tag_id", explode($"tag_ids")) 
    .join(tags, $"tag_id" === $"id") 
    .groupBy("uuid", "tag_ids") 
    .agg(first("text") as "text", collect_list("term") as "tag_terms") 
+0

おかげでこれが働いていました。私の実際のデータには、さらにいくつかの列があり、いくつかの行にはnull値が入っていたか、実際のtag_idはありませんでした。これは、左の結合で簡単に解決されました。 – matchew

0

はこれを試してみてください:

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.{SQLContext, SparkSession} 
import org.apache.spark.{SparkConf, SparkContext} 

import spark.implicits._ 

val text = spark.sqlContext.jsonFile("/tmp/text.js") 
val tags = spark.sqlContext.jsonFile("/tmp/tags.js") 

val df1 = spark.sparkContext.parallelize(text, 4).toDF() 
val df2 = spark.sparkContext.parallelize(tags, 4).toDF() 

df1.createOrReplaceTempView("A") 
df2.createOrReplaceTempView("B") 


spark.sql("select d1.key,d1.value,d2.value1 from A d1 inner join B d2 on d1.key = d2.key").show() 
関連する問題