2017-03-08 14 views
3

データセットに直接ロードしたいcsvファイル[1]があります。問題は、私はいつもまたcsvを直接Spark Datasetにロードするには?

org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate 
The type path of the target object is: 
- field (class: "scala.Float", name: "probability") 
- root class: "TFPredictionFormat" 
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object; 

のようなエラーが出る、特にphrasesフィールドのことです、私は私の場合にはすべてのフィールドを定義した場合(ケースクラスを確認し、[2])それは

org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true); 

を取得クラス[2]型Stringとしてすべてうまく動作しますが、これは私が欲しいものではありません。それを行う簡単な方法はありますか?


参照

[1]例行

B017NX63A2,Merrell,"['merrell_for_men', 'merrell_mens_shoes', 'merrel']",merrell_shoes,0.0806054356579781 

import spark.implicits._ 

val INPUT_TF = "<SOME_URI>/my_file.csv" 

final case class TFFormat (
    doc_id: String, 
    brand: String, 
    phrases: Seq[String], 
    prediction: String, 
    probability: Float 
) 

val ds = sqlContext.read 
.option("header", "true") 
.option("charset", "UTF8") 
.csv(INPUT_TF) 
.as[TFFormat] 

ds.take(1).map(println) 

[3]私は発見した方法を以下のように[2]私のコードスニペットであります最初にDataFrameレベルの列を定義し、データをDataseに変換することでt(hereまたはhereまたはhereのように)私はほとんどこれが行われることになっている方法ではないと確信しています。私はまた、エンコーダは、おそらくその答えであることをかなり確信しているが、私はどのように

答えて

5

TLの手掛かりを持っていない。標準DataFrame操作で変換csv入力してDRは、移動するための方法です。あなたが避けたいのであれば、表現力豊かな入力形式(パレットまたはJSON)を使うべきです。

通常、静的に型指定されたデータセットに変換されるデータは、すでに正しい型である必要があります。それを行うための最も効率的な方法は、csv読者のためにschema引数を提供することです:schemaが反射によって推測することができ

val schema: StructType = ??? 
val ds = spark.read 
    .option("header", "true") 
    .schema(schema) 
    .csv(path) 
    .as[T] 

:残念ながら

import org.apache.spark.sql.catalyst.ScalaReflection 
import org.apache.spark.sql.types.StructType 

val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] 

それはあなたのデータとクラスために動作しません。 csvリーダーはArrayTypeをサポートしていません(ただし、FloatTypeのようなアトミックタイプでは機能します)ので、難しい方法を使用する必要があります。素朴な解決策は、以下のように表すことができます。

import org.apache.spark.sql.functions._ 

val df: DataFrame = ??? // Raw data 

df 
    .withColumn("probability", $"probability".cast("float")) 
    .withColumn("phrases", 
    split(regexp_replace($"phrases", "[\\['\\]]", ""), ",")) 
    .as[TFFormat] 

いますが、phrasesの内容に応じて、より洗練された何かが必​​要な場合があります。

+3

ありがとうございます!もう1つの角度を追加するだけです:エンコーダーを使ってスキーマを推論することもできます: 'Encoders.product [TFFormat] .schema' –

関連する問題