2017-02-01 13 views
0

私は〜で区切られたテキストファイルを持っていますが、データフレームに変換する前にいくつかの解析を実行する必要があります。コードはRDD [String]が構文解析を行うので、テキストファイルを読み込みます。次に、RDD [行]に変換します。次に、スキーマを使用してデータフレームを作成します。Spark Scalaデータフレーム変換

私が持っている次のコードは以下のとおりです。それは動作しますが、問題は実際のスキーマが400フィールド長いことです。私は、属性(1)、属性(2)、属性(3)などを入力するより簡単な方法があるかどうか疑問に思っていました。

私は現在Spark 1.6を使用しています。 CDH 5.2.2

例入力:

20161481132310 ~  ~"This" is a comma 10 

現在のコード:

val schema_1 = StructType(Array(
StructField("EXAMPLE_1", StringType, true), 
StructField("EXAMPLE_2", StringType, true), 
StructField("EXAMPLE_3", StringType, true))) 

val rdd = sc.textFile("example.txt") 
val rdd_truncate = rdd.map(_.split("~").map(_.trim).mkString("~")) 
val row_final = rdd_truncate 
    .map(_.split("~")) 
    .map(attributes => Row(attributes(0), 
    attributes(1), 
    attributes(2))) 

val df = sqlContext.createDataFrame(row_final, schema_1) 

私は次のために変更の提案に基づいています。引用符を除いて動作します。入力の「This」は失敗します。助言がありますか?あなただけselect使用するフィールドをトリミングしたい場合

spark.read.schema(schema).option("delimiter", "~").csv("example.txt") 

import org.apache.spark.sql.functions.{col, trim} 

df.select(df.columns.map(c => trim(col(c)).alias(c)): _*) 

を使用すると、スパーク1.1を使用する場合は、spark-csvを使用することができます。

val df = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("delimiter","~") 
     .schema(schema) 
     .load("example.txt") 
val df_final = df.select(df.columns.map(c =>trim(col(c)).alias(c)): _*) 

答えて

3

ちょうど標準のCSVリーダーを使用し

sqlContext.read 
    .format("com.databricks.spark.csv") 
    .schema(schema) 
    .option("delimiter", "~") 
    .load("example.txt") 

これが何らかの理由で十分でない場合は、Row.fromSeqを使用できます。

Row.fromSeq(line.split("~").take(3)) 
関連する問題