2017-05-16 6 views
0

私は単一のデータフレームに変換したいデータフレームのリストを持っています。そのデータフレームとデータタイプの特定の列新しいデータフレームのようにする必要があります.DfListはList [sql.Dataframe]です。助けていただければ幸いです。以下はスカラ、データフレームのリストを単一のデータフレームに変換し、別のデータフレームと特定の列でマージする

dfList[sql.Dataframe]=List([A: int]:Dataframe, [B: string]:Dataframe, [C: long]:Dataframe, [D: string]:Dataframe) 

dfList = List(+-------+----------+--------+--------+ 
       | A |  B |  C | D | 
       +-------+----------+--------+--------+ 
       |  41| 912AEQ| 2016022|  UJ| 
       |  82| 912ARD| 2016022|  GH| 
       | 903| 912AYQ| 2016022|  KL| 
       | 454| 912AKK| 2016022|  KL| 
       |  95| 912AHG| 2016022|  KH| 
       +-------+----------+--------+--------+) 

the data type of df is Id: int, v1: string, v2: long, v3: string 

df[Dataframe] =    
+---+---+-----------+-----+ 
| Id| v1| v2  | v3 | 
+---+---+-----------+-----+ 
| 11| AS| 0989765498|SDAWQ| 
| 12| GH| 7654998599|TRUDR| 
| 13| IO|10654998580|ABUCK| 
| 14|1JG|65499855101|KLBCK| 
| 15| RT|10265499852|BCKKL| 
+---+---+-----------+-----+    

The newDF will be combination of dfList and df. 
The datatype of newDF should be Id: int, A: int, B: string, C: long, D: string 


newDF = 
    +---+------+----------+--------+--------+ 
    | Id| A |  B |  C | D | 
    +---+------+----------+--------+--------+ 
    | 11| 41| 912AEQ| 2016022|  UJ| 
    | 12| 82| 912ARD| 2016022|  GH| 
    | 13| 903| 912AYQ| 2016022|  KL| 
    | 14| 454| 912AKK| 2016022|  KL| 
    | 15| 95| 912AHG| 2016022|  KH| 
    +---+------+----------+--------+--------+ 

答えて

0

次の2つのデータフレームを結合する任意のキーを持っていないので、あなたは、両方のインデックス列を追加し、それらを結合し、最後にインデックスを削除する必要があり、完全なソリューションです。削減と結合によって、データフレームのリストから単一のデータフレームを作成します。

import spark.implicits._ 

    //Create dfList dataframe 

    val d1 = spark.sparkContext 
    .parallelize(
     Seq(
     (41, "912AEQ", 2016022l, "UJ"), 
     (82, "912ARD", 2016022l, "GH"), 
     (903, "912AYQ", 2016022l, "KL") 
    )) 
    .toDF("A", "B", "C", "D") 


    val d2 = spark.sparkContext 
    .parallelize(
     Seq(
     (454, "912AKK", 2016022l, "KL"), 
     (95, "912AHG", 2016022l, "KH") 
    )) 
    .toDF("A", "B", "C", "D") 

    val allD = List(d1, d2) //list of dataframe 

    //create a singe dataframe from list of dataframe 
    val dfList = allD.reduce(_ union _) 


    //Create df dataframe 
    val df = spark.sparkContext 
    .parallelize(
     Seq(
     (11, "AS", 989765498l, "SDAWQ"), 
     (12, "GH", 7654998599l, "TRUDR"), 
     (13, "IO", 10654998580l, "ABUCK"), 
     (14, "1JG", 65499855101l, "KLBCK"), 
     (15, "RT", 10265499852l, "BCKKL") 
    )) 
    .toDF("id", "v1", "v2", "v3") 

    val dfListWithIndex = addIndex(dfList) // and index column 
    val dfWithIndex = addIndex(df).drop("v1", "v2", "v3") //add index column and remove unnecessary columns 

    val newDF = dfWithIndex.join(dfListWithIndex, "index").drop("index") //join two dataframe and drop index column 


    dfListWithIndex.show() 
    dfWithIndex.show() 
    newDF.printSchema() 
    newDF.show 

    def addIndex(df: DataFrame) = spark.sqlContext.createDataFrame(
    // Add index column 
    df.rdd.zipWithIndex.map { 
     case (row, index) => Row.fromSeq(row.toSeq :+ index) 
    }, 
    // Create schema for index column 
    StructType(df.schema.fields :+ StructField("index", LongType, false)) 
) 
+0

返信ありがとうdfList is List [sql.Dataframe] –

+0

データフレームのリストの回答を更新しました。確認してください。 –

+0

Shankarありがとう、dfListは、データフレームのリストです、更新された投稿を参照してください。 –

関連する問題