2017-03-09 13 views
0

pyspark dfdataに2つのデータフレームがあります。スキーマは 2つのデータフレームのカラム名を比較する

以下
>>> df.printSchema() 
root 
|-- id: integer (nullable = false) 
|-- name: string (nullable = true) 
|-- address: string (nullable = true) 
|-- nation: string (nullable = true) 
|-- Date: timestamp (nullable = false) 
|-- ZipCode: integer (nullable = true) 
|-- car: string (nullable = true) 
|-- van: string (nullable = true) 

>>> data.printSchema() 
root 
|-- id: integer (nullable = true) 
|-- name: string (nullable = true) 
|-- address: string (nullable = true) 
|-- nation: string (nullable = true) 
|-- date: string (nullable = true) 
|-- zipcode: integer (nullable = true) 

のようなものです今私は、スキーマの両方を比較することによって、私のdataデータフレームに列車やバンを追加したいです。

カラムが同じ場合は何もしませんが、カラムが異なる場合は、カラムを持たないデータフレームにカラムを追加します。

どのようにpysparkでこれを達成できますか?

列がデータフレームに付加されるとFYI私は火花1.6

を使用しています。新しく追加されたデータフレーム内のこれらの列の値はnullにする必要があります。

ここで例えばデータのデータフレームの列車やバンは、null値が含まれている必要がありますが、DFデータフレーム内の同じ列があった場合に何が起こるか、元の値

を持っている必要がありますので、我々はdataデータフレームに列を追加しますスキーマはありませんが、StructTypeがStructFieldsのリストからなるように、我々は不足している列を比較して検索するには、フィールドのリストを取得することができます

答えて

0

を追加する2つの以上の新しい列、

df_schema = df.schema.fields 
data_schema = data.schema.fields 
df_names = [x.name.lower() for x in df_scehma] 
data_names = [x.name.lower() for x in data_schema] 
if df_schema <> data_schema: 
    col_diff = set(df_names)^set(data_names)  
    col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if ((x[0] is not None and x[0].name.lower() in col_diff) or x[1].name.lower() in col_diff)] 
    for i in col_list: 
     if i[0] in df_names: 
      data = data.withColumn("%s"%i[0],lit(None).cast(i[1])) 
     else: 
      df = df.withColumn("%s"%i[0],lit(None).cast(i[1])) 
else: 
    print "Nothing to do" 
です

ヌル値がなく、スキーマの違いがNULL可能な列であるため、そのチェックを使用していない場合は、列を追加すると述べました。あなたがそれを必要とする場合は、以下のようにヌル値を許可するためのチェックを追加し、あなたが複数のテーブルにこれを行う必要がある場合は、それはあるかもしれない、StructTypeとStructFieldsについての詳細は https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType

+0

上記の回答では、違いとして2列しかありません。 2つ以上の列がある場合はどうなりますか。どのように動的に渡すことができますか – User12345

+0

上記のコードは、2つのスキーマの違いを動的に取ります。どの列が相違点として存在しますか?すべてが考慮されます。 – Suresh

+0

@Suresh初期データフレームがハイブテーブルから来たと仮定してテーブルを変更する方法を教えてください。データフレームに列を追加する代わりに、既存のハイブテーブルにヌル値を追加できますか? –

0

をドキュメントをチェック

col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if (x[0].name.lower() in col_diff or x[1].name.lower() in col_diff) and not x.nullable] 

してくださいコードを一般化する価値があります。このコードは、一致しないソース列の最初の非NULL値を使用して、ターゲット表に新しい列を作成します。

from pyspark.sql.functions import lit, first 

def first_non_null(f,t): # find the first non-null value of a column 
    return f.select(first(f[t], ignorenulls=True)).first()[0] 

def match_type(f1,f2,miss): # add missing column to the target table 
    for i in miss: 
     try: 
      f1 = f1.withColumn(i, lit(first_non_null(f2,i))) 
     except: 
      pass 
     try: 
      f2 = f2.withColumn(i, lit(first_non_null(f1,i))) 
     except: 
      pass 
    return f1, f2 

def column_sync_up(d1,d2): # test if the matching requirement is met 
    missing = list(set(d1.columns)^set(d2.columns)) 
    if len(missing)>0: 
     return match_type(d1,d2,missing) 
    else: 
     print "Columns Match!" 

df1, df2 = column_sync_up(df1,df2) # reuse as necessary 
関連する問題