2017-01-03 5 views
1

テキストファイルからDataframeを作成したい。ケースクラスを使用しないでDataFrameを作成する方法は?

ケースクラスには22文字の制限があります。私は100以上のフィールドを持っています。

したがって、私はケースクラスを作成する際に問題に直面しています。

実際のターゲットはcreate Dataframeです。

ケースクラスを使用せずに、Dataframeを作成する他の方法はありますか?

答えて

3

1つの方法は、ファイルを直接読み取ってデータフレームを作成するためにspark csvパッケージを使用することです。ファイルにヘッダーがある場合、または構造体タイプを使用してカスタムスキーマを作成できる場合、パッケージはヘッダーからスキーマを直接推論します。

以下の例では、私はカスタムスキーマを作成しました。

databricks spark csv documentation pageの他のさまざまなオプションを確認できます。

その他のオプション:

あなたは上記のように構造体タイプを使用してスキーマを作成し、データフレームを作成するためにsqlContextのcreateDataframeを使用することができます。ケースクラスは、(例えば、レコードの構造を文字列でエンコードされ、またはテキストデータセットが解析され、フィールドが異なる投影される事前に定義することができない

val vRdd = sc.textFile(..filelocation..) 
val df = sqlContext.createDataframe(vRdd,schema) 
+0

Rajatありがとうございました....上記のように "customSchema"を使用して100個のフィールドのスキーマを作成することは可能でしょうか? –

+0

@SiddharthArekarうん、あなたはそれを行うことができます。ちょうどクエリは、あなたがスキーマを作成しているか、またはファイルに基づいて、ハイブテーブルを持っています –

+0

私はスキーを持つハイブテーブルを持っていない...これは100プラスフィールドを持つテキストファイルです....私はこのテキストファイルを使用してデータフレームを作成したい –

2

From the Spark Documentation:

異なるユーザーの場合)、DataFrameは3つの手順でプログラムで作成できます。

  1. 元のRDDからRDDの行を作成します。
  2. ステップ1
  3. で作成RDD内の行の構造と一致StructTypeSQLContextによって提供createDataFrame方法を介して行のRDDにスキーマを適用することによって表されるスキーマを作成します。

その他の方法StructType内datatyoeでStructFieldを定義することです。複数のデータ型を定義することができます。両方の実装について下記の例を参照してください。両方の実装を理解するためにコメント付きのコードも検討してください。

package com.spark.examples 

import org.apache.spark._ 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql._ 
import org.apache.spark._ 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 

// Import Row. 
import org.apache.spark.sql.Row; 
// Import Spark SQL data types 
import org.apache.spark.sql.types.{ StructType, StructField, StringType } 

object MultipleDataTypeSchema extends Serializable { 

    val conf = new SparkConf().setAppName("schema definition") 

    conf.set("spark.executor.memory", "100M") 
    conf.setMaster("local") 

    val sc = new SparkContext(conf); 
    // sc is an existing SparkContext. 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    def main(args: Array[String]): Unit = { 

    // Create an RDD 
    val people = sc.textFile("C:/Users/User1/Documents/test") 

    /* First Implementation:The schema is encoded in a string, split schema then map it. 
    * All column dataype will be string type. 

    //Generate the schema based on the string of schema 
    val schemaString = "name address age" //Here you can read column from a preoperties file too. 
    val schema = 
     StructType(
     schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));*/ 

    // Second implementation: Define multiple datatype 

    val schema = 
     StructType(
     StructField("name", StringType, true) :: 
      StructField("address", StringType, true) :: 
      StructField("age", StringType, false) :: Nil) 

    // Convert records of the RDD (people) to Rows. 
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim, p(2).trim)) 
    // Apply the schema to the RDD. 
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) 
    peopleDataFrame.printSchema() 

    sc.stop 

    } 
} 

その出力:

17/01/03 14:24:13 INFO SparkContext: Created broadcast 0 from textFile at MultipleDataTypeSchema.scala:30 
root 
|-- name: string (nullable = true) 
|-- address: string (nullable = true) 
|-- age: string (nullable = false) 
+0

あなたの答えはサンデップ! –

+0

@SiddharthArekar:ありがとうとは別に、投票に気をつけることもできます。 –

0

sqlContextのsqlContext.read.csv()メソッドを介してファイルを読むには適しています。パラメータを渡して実行を制御できる組み込みメソッドが多数用意されています。しかし、1.6より前のsparkバージョンではこれを利用できないかもしれません。ですから、あなたはspark-contextのtextFileメソッドによってそれを行うかもしれません。

Val a = sc.textFile("file:///file-path/fileName") 

これはあなたにRDD [String]を与えます。RDDを作成し、これをデータフレームに変換する必要があります。

ここで、StructTypesを使用してRDDのスキーマを定義してください。これにより、必要な数のStructFieldを持つことができます。

val schema = StructType(Array(StructField("fieldName1", fieldType, ifNullablle), 
           StructField("fieldName2", fieldType, ifNullablle), 
           StructField("fieldName3", fieldType, ifNullablle), 
           ................ 
          )) 

ここでは、1)RDD(textFileメソッドを使用して作成したもの)があります。 2)必要な数の属性を持つスキーマ。

次の手順は、間違いなくあなたのRDDの権利とこのスキーマをマップすることです! あなたが持っているRDDが単一のString、つまりRDD [String]であることがわかります。しかし、これで実際にやりたいことは、スキーマを作成した多くの変数に変換することです。だから、コンマに基づいてRDDを分割しないでください。次の式は、マップ操作を使用してこれを行う必要があります。

val b = a.map(x => x.split(",")) 

評価時にRDD [Array [String]]が表示されます。

しかし、このArray [String]はまだ私が操作を適用するのに直感的ではないと言うかもしれません。 これで、あなたの休暇にRow APIが提供されます。インポートorg.apache.spark.sql.Row を使用してインポートします。実際には、分割されたRDDとRowオブジェクトをタプルとしてマッピングします。これを参照してください:

import org.apache.spark.sql.Row 
val c = b.map(x => Row(x(0), x(1),....x(n))) 

上記の式は、各要素が行であるRDDを示します。今すぐスキーマを与えるだけです。ここでもsqlContextのcreateDataFrameメソッドが簡単にジョブを処理します。

val myDataFrame = sqlContext.createDataFrame(c, schema) 

このメソッドは、次の2つのパラメータをとります。1)作業が必要なRDD。 2)その上に適用するスキーマ。 結果の評価はDataFrameオブジェクトです。 最後に、DataFrameオブジェクトmyDataFrameを作成しました。 myDataFrameでshowメソッドを使用すると、データが表形式で表示されます。 これで、spark-sql操作を実行することができます。