2016-08-11 5 views
0

Spark 1.5.2を使用して、以下の構文を使用してscalaオブジェクトからデータフレームを作成しています。私の目的は、単体テスト用のデータを作成することです。代わりにSpark:SQLコンテキスト:Scalaオブジェクトからデータフレームを作成する

class Address (first:String = null, second: String = null, zip: String = null){} 
class Person (id: String = null, name: String = null, address: Seq[Address] = null){} 

def test() = { 

    val sqlContext = new SQLContext(sc) 

    import sqlContext.implicits._ 

    val persons = Seq(
    new Person(id = "1", name = "Salim", 
     address = Seq(new Address(first = "1st street"))), 
    new Person(name = "Sana", 
     address = Seq(new Address(zip = "60088"))) 
) 

    // The code can't infer schema automatically 
    val claimDF = sqlContext.createDataFrame(sc.parallelize(persons, 2),classOf[Person]) 

    claimDF.printSchema() // This prints "root" not the schema of Person. 
} 

私は人を変換し、ケースクラスにアドレスならば、スパークは、上記の構文を使用して、またはsc.parallelize(persons, 2).toDFを使用するか、または私ができるsqlContext.createDataFrame(sc.parallelize(persons, 2),StructType)

を使用して自動的にのスキーマを継承することができますそれは20以上のフィールドを保持することができず、クラス内に多くのフィールドを持っているため、ユースケースクラスを使用していません。また、StructTypeを使用すると多くの不都合が生じます。ケースクラスは最も便利ですが、あまりに多くのプロパティを保持することはできません。

ご協力いただきありがとうございます。

+0

。 (このシグネチャのために 'createDataFrame [A <:Product](data:Seq [A])') –

答えて

0

コードを2回変更すると、caseクラスを使用せずにprintSchema()がデータフレームの完全な構造を生成します。あなたが.toDFを使用して、データフレームを作成する必要があり、第二に

class Address (first:String = null, second: String = null, zip: String = null) extends Product with Serializable 
{ 
    override def canEqual(that: Any) = that.isInstanceOf[Address] 
    override def productArity: Int = 3 
    def productElement(n: Int) = n match { 
    case 0 => first; case 1 => second; case 2 => zip 
    } 
} 

class Person (id: String = null, name: String = null, address: Seq[Address] = null) extends Product with Serializable 
{ 
    override def canEqual(that: Any) = that.isInstanceOf[Person] 
    override def productArity: Int = 3 
    def productElement(n: Int) = n match { 
    case 0 => id; case 1 => name; case 2 => address 
    } 
} 

:ダニエルが提案されているよう

まず、あなたがあなたのクラスを持っているscala.Productトレイトを拡張必要があります(痛みを伴うが、以下.toDF方法のために必要)むしろそうようsqlContext.createDataFrame(..)を使用するよりimport sqlContext.implicits._と範囲になる暗黙方法

val claimDF = sc.parallelize(persons, 2).toDF 

次いでclaimDF.printSchema()が印刷されます:

root 
|-- id: string (nullable = true) 
|-- name: string (nullable = true) 
|-- address: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- first: string (nullable = true) 
| | |-- second: string (nullable = true) 
| | |-- zip: string (nullable = true) 

また、Scala 2.11.0-M3を使用すると、ケースクラスのフィールドの制限22個を削除できます。

1

入力いただきありがとうございます。

大規模なケースクラスをサポートしていたScala 2.11を使用して最終的にSpark 2.1に移行したため、この問題は解決しました。

Spark 1.6とScala 2.10では、Dataframeを構築するために行オブジェクトと構造型を構築しました。私はあなたのクラスは、[製品特性](http://www.scala-lang.org/api/2.10.6/#scala.Product)を拡張し、その抽象メソッドを実装する場合、それがうまくいくかもしれないと思い

val rows = Seq(Row("data")) 
val aRDD = sc.parallelize(rows) 
val aDF = sqlContext.createDataFrame(aRDD,getSchema()) 

def getSchema(): StructType= { 
    StructType(
     Array(
      StructField("jobNumber", StringType, nullable = true)) 
    ) 
} 
関連する問題