2016-11-09 14 views
-1

私はこのようなRDDを持っています:RDD[(Any, Array[(Any, Any)])] 私はただDataFrameに変換したいだけです。私が使用している場合RDDをデータフレームに変換します

(B,List((3,12/06/2012), (4,17/06/2012)))  
(A,List((1,12/06/2012), (2,13/06/2012)))  
(C,List((5,14/06/2012))) 

またはこの

(A,[Lscala.Tuple2;@3e8f27c9) 
(C,[Lscala.Tuple2;@6f22defb) 
(B,[Lscala.Tuple2;@1b8692ec) 

のような:

.mapValues(i => i.toArray) 

を私はすでにこれをしようとこのように私は私のRDDは次のようになり、このスキーマに

val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false))) 

val df = Seq(
    ("A",1,"12/06/2012"), 
    ("A",2,"13/06/2012"), 
    ("B",3,"12/06/2012"), 
    ("B",4,"17/06/2012"), 
    ("C",5,"14/06/2012")).toDF("C1", "C2","C3") 
df.show(false) 

val rdd = df.map(line => (line(0), (line(1), line(2)))) 
    .groupByKey() 
    .mapValues(i => i.toList).foreach(println) 

val output_df = sqlContext.createDataFrame(rdd, schema) 

を使用します:

val output_df = sqlContext.createDataFrame(rdd, schema) 

しかし、私は得る:ラファエル・ロートへ

Error:(40, 32) overloaded method value createDataFrame with alternatives: 
    (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
cannot be applied to (Unit, org.apache.spark.sql.types.StructType) 
    val output_df = sqlContext.createDataFrame(rdd, schema) 


は、第2の方法ウィッヒが機能しないしようとした私が取得:

Error:(41, 24) No TypeTag available for MySchema 
    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF() 

第一の方法の作業罰金が、私は失われました私のタプルの最初の要素は.mapValues(i => i.map(_._2))

です

val rdd = df.map(line => (line(0), (line(1), line(2)))).groupByKey() 
     .mapValues(i => i.map(w => (w._1,w._2).toString)) 
     .map(i=>Row(i._1,i._2)) 

ありがとうございます:最初の方法は、二つの要素

私はそれが文字列で私のタプルを変換するが、これは私がコラムを読むために私の文字列のタプルを分割する必要がありますので、私によるとエレガントな解決策ではない解決を維持しますあなたの助けを借りて

+0

[rddオブジェクトをsparkのデータフレームに変換する方法]の複製(http://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – cheseaux

+3

私はそれが役に立つと思うあなたが質問 – maasg

+0

@ aにエラーを追加した場合。 moussaが「MySchemaで使用できるTypeTagがない」を解決するには、メインメソッドの外にケースクラスを定義する必要があります(もしあれば) –

答えて

0

GroupByKeyはタプルのSeqを与えますが、あなたはこのスキルを考慮しませんでした。さらに、sqlContext.createDataFrameには、あなたが提供していないRDD[Row]が必要です。

これはあなたのschemaを使用して動作するはずです:

val rdd = df.map(line => (line(0), (line(1), line(2)))) 
    .groupByKey() 
    .mapValues(i => i.map(_._2)) 
    .map(i=>Row(i._1,i._2)) 

val output_df = sqlContext.createDataFrame(rdd, schema) 

をあなたはまた、タプルをマッピングするために使用することができますcase classを使用することができます(タプルスキーマのかわからないが、プログラムを作成することができます):

可能
val df = Seq(
     ("A", 1, "12/06/2012"), 
     ("A", 2, "13/06/2012"), 
     ("B", 3, "12/06/2012"), 
     ("B", 4, "17/06/2012"), 
     ("C", 5, "14/06/2012")).toDF("C1", "C2", "C3") 
    df.show(false) 

    val rdd = df.map(line => (line(0), (line(1), line(2)))) 
     .groupByKey() 
     .mapValues(i => i.toList) 

    // this should be placed outside of main() 
    case class MySchema(C1: String, C4: List[(Int, String)]) 

    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF() 
+0

こんにちは、あなたの答えをありがとう、それは動作しません、私はあなたの発言。あなたは何か考えているなら、本当に役に立ちます。 –

+0

ありがとう、MySchemaを自分のメソッドの外に移動すると非常にうまく動作します –

関連する問題