2016-11-30 2 views
10

スパークのCDH 1.6です。タイムスタンプと日付タイプを含むスパークデータフレームへのCSVの読み取り

私は、ApacheスパークDATAFRAMEにこの仮想CSVをインポートしようとしています:

$ hadoop fs -cat test.csv 
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a 
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a 

は私がdatabricks-CSV瓶を使用しています。

val textData = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss") 
    .option("inferSchema", "true") 
    .option("nullValue", "null") 
    .load("test.csv") 

結果のDataFrameのスキーマを作成するために、inferSchemaを使用します。 printSchema()関数は、上記のコードのために私に次のような出力が得られます。

scala> textData.printSchema() 
root 
|-- C0: string (nullable = true) 
|-- C1: string (nullable = true) 
|-- C2: string (nullable = true) 
|-- C3: string (nullable = true) 
|-- C4: string (nullable = true) 
|-- C5: timestamp (nullable = true) 
|-- C6: string (nullable = true) 

scala> textData.show() 
+---+---+---+----------+---+--------------------+---+ 
| C0| C1| C2|  C3| C4|     C5| C6| 
+---+---+---+----------+---+--------------------+---+ 
| a| b| c|2016-09-09| a|2016-11-11 09:09:...| a| 
| a| b| c|2016-09-10| a|2016-11-11 09:09:...| a| 
+---+---+---+----------+---+--------------------+---+ 

C3列が文字列タイプがあります。私はC3がの日付タイプを持って欲しいです。これを日付型にするには、次のコードを試してみました。

val textData = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .option("dateFormat", "yyyy-MM-dd") 
    .option("inferSchema", "true") 
    .option("nullValue", "null") 
    .load("test.csv") 

scala> textData.printSchema 
root 
|-- C0: string (nullable = true) 
|-- C1: string (nullable = true) 
|-- C2: string (nullable = true) 
|-- C3: timestamp (nullable = true) 
|-- C4: string (nullable = true) 
|-- C5: timestamp (nullable = true) 
|-- C6: string (nullable = true) 

scala> textData.show() 
+---+---+---+--------------------+---+--------------------+---+ 
| C0| C1| C2|     C3| C4|     C5| C6| 
+---+---+---+--------------------+---+--------------------+---+ 
| a| b| c|2016-09-09 00:00:...| a|2016-11-11 00:00:...| a| 
| a| b| c|2016-09-10 00:00:...| a|2016-11-11 00:00:...| a| 
+---+---+---+--------------------+---+--------------------+---+ 

このコードと最初のブロックとの間の唯一の違いは、代わり「YYYY-MM-DD HHのDATEFORMATオプション行(Iは"YYYY-MM-DD" を使用ある:MM:SS ")。C330とC5の両方をタイムスタンプ(C3はまだ日付ではありません)としています。しかし、C5の場合、HH :: mm:ss部分は無視され、データにはゼロとして表示されます。

理想的には、C3は日付型、C5はタイムスタンプ型、HH:mm:ss型は無視されません。私の解決策は今のようになります。私は自分のDBから並列にデータを引っ張ってcsvを作ります。私はすべての日付をタイムスタンプとして引っ張るようにします(理想的ではありません)。だから、テストのcsvは次のようになります。

$ hadoop fs -cat new-test.csv 
a,b,c,2016-09-09 00:00:00,a,2016-11-11 09:09:09.0,a 
a,b,c,2016-09-10 00:00:00,a,2016-11-11 09:09:10.0,a 

これが私の最後の作業コードです:ここで

val textData = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss") 
    .schema(finalSchema) 
    .option("nullValue", "null") 
    .load("new-test.csv") 

が、私は完全なタイムスタンプの形式を使用します(「YYYY-MM-DD HH:MMを:ss ")をdateFormatに挿入します。私は手動でc3が日付でC5がタイムスタンプ型(Spark SQL型)であるfinalSchemaインスタンスを作成します。これらのスキーマを適用するには、schema()関数を使用します。次のような出力が見えます:?

scala> finalSchema 
res4: org.apache.spark.sql.types.StructType = StructType(StructField(C0,StringType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,DateType,true), StructField(C4,StringType,true), StructField(C5,TimestampType,true), StructField(C6,StringType,true)) 

scala> textData.printSchema() 
root 
|-- C0: string (nullable = true) 
|-- C1: string (nullable = true) 
|-- C2: string (nullable = true) 
|-- C3: date (nullable = true) 
|-- C4: string (nullable = true) 
|-- C5: timestamp (nullable = true) 
|-- C6: string (nullable = true) 


scala> textData.show() 
+---+---+---+----------+---+--------------------+---+ 
| C0| C1| C2|  C3| C4|     C5| C6| 
+---+---+---+----------+---+--------------------+---+ 
| a| b| c|2016-09-09| a|2016-11-11 09:09:...| a| 
| a| b| c|2016-09-10| a|2016-11-11 09:09:...| a| 
+---+---+---+----------+---+--------------------+---+ 

が容易またはCSVファイルを解析するためのボックスの方法(つまり、スパークデータフレームに日付とタイムスタンプのタイプの両方を持っている

関連リンクの外にあります:非自明な例のための推論オプションを使用すると
http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
https://github.com/databricks/spark-csv

答えて

1

それはおそらく期待した結果を返すことはありませんあなたはで見ることができるように。:

if (field == null || field.isEmpty || field == nullValue) { 
    typeSoFar 
} else { 
    typeSoFar match { 
    case NullType => tryParseInteger(field) 
    case IntegerType => tryParseInteger(field) 
    case LongType => tryParseLong(field) 
    case DoubleType => tryParseDouble(field) 
    case TimestampType => tryParseTimestamp(field) 
    case BooleanType => tryParseBoolean(field) 
    case StringType => StringType 
    case other: DataType => 
     throw new UnsupportedOperationException(s"Unexpected data type $other") 

それが唯一のタイムスタンプの種類ではなく、日付タイプで、各列に一致するようにしようとしますので、この場合の「ボックスソリューションの外」ことはできません。しかし私の経験では、より簡単な解決策は、needed typeでスキーマを直接定義しているため、推測オプションを避けることになります。最終的なスキーマは効率的なソリューションです。

関連する問題