2017-07-07 6 views
0

ファイルをsparkにロードしようとしています。 私は以下のようにスパークに通常のテキストファイルをロードする場合:Sparkのデータセットにスキーマを追加するには?

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

結果は次のとおりです。

partFile: org.apache.spark.sql.Dataset[String] = [value: string] 

私は出力にデータセットを見ることができます。私は、JSONファイルをロードする場合しかし:

pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field] 

JSON /寄木細工/オークのファイルが持っているスキーマ:

val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson") 

成果は、既製のスキーマとデータフレームです。だから私はこれがSparkバージョンの2xの機能であることを理解できます。これは、このケースではDataFrameを直接取得し、通常のtextFileの場合は理に適っていないスキーマがないデータセットを取得します。 私が知りたいのは、sparkにtextFileをロードした結果のデータセットにスキーマを追加する方法です。 RDDの場合は、スキーマを追加してDataFrameに変換するcase class/StructTypeオプションがあります。 誰でも私にそれをどうすればいいのか教えてくれますか? dataset/dataframeを作成するcase classを使用して

答えて

4

textFileを使用すると、各行ファイルの文字列はデータセットの文字列になります。スキーマとデータフレームに変換するには、toDFを使用することができます。この場合

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

import sqlContext.implicits._ 
val df = partFile.toDF("string_column") 

は、データフレームのタイプStringTypeの単一の列のスキーマを持っています。

あなたのファイルは、より複雑なスキーマが含まれている場合(ファイルが構造化csv形式である場合)は、CSVリーダーを使用することができ、次のいずれか

val partFile = spark.read.option("header", "true").option("delimiter", ";").csv("hdfs://quickstart:8020/user/cloudera/partfile") 

それとも、マップを使用してデータセットを処理することができ、その後、使用してtoDFをDataFrameに変換してください。たとえば、あなたは一つの列が(intとして)行の最初の文字と(もintとして)第四文字であるために、他の列になりたいとします。また

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

val processedDataset: Dataset[(Int, Int)] = partFile.map { 
    line: String => (line(0).toInt, line(3).toInt) 
} 

import sqlContext.implicits._ 
val df = processedDataset.toDF("value0", "value3") 

、あなたが定義することができますあなたのデータフレームの最終的なスキーマ表現するケースクラス、:df.printSchemaを呼び出す上記のどちらの場合も

case class MyRow(value0: Int, value3: Int) 

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

val processedDataset: Dataset[MyRow] = partFile.map { 
    line: String => MyRow(line(0).toInt, line(3).toInt) 
} 

import sqlContext.implicits._ 
val df = processedDataset.toDF 

は、表示されるでしょう:あなたの答えに基づいて

root 
|-- value0: integer (nullable = true) 
|-- value3: integer (nullable = true) 
+0

を、私はそれを少し微調整しなければなりませんでした。区切り文字に基づいてデータセットを分割する:val partdata = partFile.map(p => p.split( "、")) また、このステートメントを変更する必要がありました:val prdt = partdata.map {line => rows(line非数値データが 'char'形式であり、私が持っていたので、 (0).toInt、line(1).toString、line(2).toInt、line(3).toString、line(4).toString)} それらを 'String'に変換します。それは今働いている。 – Sidhartha

+1

@シドハルタ、それがうまくいくことは知っていました。コンマで区切られたファイルの場合は、私が最初に 'spark.read.csv'を使うことを考えてみてください。それは簡単かもしれません。 –

0

はあなたのようにメインの実行クラスの外case classを定義する必要があなたは

x1,32 
x2,32 
x3,32 

としてデータnameageを含むテキストファイルを持って言うことができます非常に簡単です。

case class Info(name: String, 
       age: Int) 

次に、r sparkContext.textFileを使用してファイルをeadingとケースクラスの上に私たちは、あなたがrdd[Row]schemaを作成し、出力が同じである

val data = sc.textFile("path to text file") 
    .map(line=> line.split(",")).map(array => Row(array(0), array(1).toInt)) 

val schema = StructType(
    Array(
    StructField("name", StringType, true), 
    StructField("age", IntegerType, true) 
) 
) 
sqlContext.createDataFrame(data, schema).show(false) 

として sqlContextを使用する必要がどこ schemaを使用して dataframe

val data = sc.textFile("path to text file") 

    import sqlContext.implicits._ 
    data.map(line => line.split(",")).map(array => Info(array(0), array(1).toInt)).toDF.show(false) 

+----+---+ 
|name|age| 
+----+---+ 
|x1 |32 | 
|x2 |32 | 
|x3 |32 | 
+----+---+ 

は、以下の通りである必要があります上記

+----+---+ 
|name|age| 
+----+---+ 
|x1 |32 | 
|x2 |32 | 
|x3 |32 | 
+----+---+ 
関連する問題