2017-07-30 13 views
1

異なる列の列を持つ2つのスパークデータフレームを結合しようとしています。この目的のために、私は次のリンクを参照: -異なる列の2つのスパークデータフレームの結合

how to union 2 spark dataframes with different amounts of columns

次のように私のコードがある -

val cols1 = finalDF.columns.toSet 
val cols2 = df.columns.toSet 
val total = cols1 ++ cols2 
finalDF=finalDF.select(expr(cols1, total):_*).unionAll(df.select(expr(cols2, total):_*)) 

def expr(myCols: Set[String], allCols: Set[String]) = { 
    allCols.toList.map(x => x match { 
    case x if myCols.contains(x) => col(x) 
    case _ => lit(null).as(x) 
    }) 
} 

をしかし、私は直面しています問題は、両方のデータフレーム内の列のいくつかのネストされています。私はStructTypeとプリミティブ型の両方の列を持っています。さて、列A(StructTypeの)はdfにあり、finalDFにはありません。しかしexprでは、

case _ => lit(null).as(x) 

はStructTypeではありません。だから私はそれらを組合わせることができません。それは私に次のエラーを与えています -

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. NullType <> StructType(StructField(_VALUE,StringType,true), StructField(_id,LongType,true)) at the first column of the second table. 

私はここで何ができるでしょうか?

+0

このstackoverflowの[スレッド](https://stackoverflow.com/questions/42530431/spark-union-fails-with-nested-json-dataframe)を確認してください。 –

+0

@ NinjaDev82はい、すべてのファイルを同時にインポートすることができます。しかし、私は同じファイルのヘッダーから抽出された値を持つ列を追加する必要があります(私はxmlファイルをsparkデータフレームにインポートしています)。 それで、データをsparkデータフレームにインポートして、そこからヘッダーを抽出し、その値をそのdfの新しい列に追加しています。このヘッダー値は、dfごとに異なります。 – Ishan

答えて

1

これには、組み込みのスキーマの推論を使用します。これは、より高価な方法ですが、競合の可能性を持つ複雑な構造を、マッチングよりもはるかに簡単:

spark.read.json(df1.toJSON.union(df2.toJSON)) 

あなたはinput_file_nameを使用して、ヘッダから抽出された情報を用いても、同時にすべてのファイルをインポートし、joinことができます。

import org.apache.spark.sql.function 

val metadata: DataFrame // Just metadata from the header 
val data: DataFrame  // All files loaded together 

metadata.withColumn("file", input_file_name) 
    .join(data.withColumn("file", input_file_name), Seq("file")) 
関連する問題