2017-06-13 15 views
5

私はデータセットに私のデータフレームに変換する必要があると私は、次のコードを使用:ScalaのApache Sparkでデータフレームをデータセットに変換するには?

val final_df = Dataframe.withColumn(
     "features", 
     toVec4(
     // casting into Timestamp to parse the string, and then into Int 
     $"time_stamp_0".cast(TimestampType).cast(IntegerType), 
     $"count", 
     $"sender_ip_1", 
     $"receiver_ip_2" 
    ) 
    ).withColumn("label", (Dataframe("count"))).select("features", "label") 

    final_df.show() 

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7)) 
    val TrainingDF = trainingTest(0) 
    val TestingDF=trainingTest(1) 
    TrainingDF.show() 
    TestingDF.show() 

    ///lets create our liner regression 
    val lir= new LinearRegression() 
    .setRegParam(0.3) 
    .setElasticNetParam(0.8) 
    .setMaxIter(100) 
    .setTol(1E-6) 

    case class df_ds(features:Vector, label:Integer) 
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 

    val Training_ds = TrainingDF.as[df_ds] 

私の問題は、は、私は次のようなエラーだ、ということである:

Error:(96, 36) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 
    val Training_ds = TrainingDF.as[df_ds] 

をそれは数いるようですデータフレーム内の値の数は、自分のクラスの値の数と異なります。しかし、私はTrainingDFのデータフレームにcase class df_ds(features:Vector, label:Integer)を使用しています。それは、機能のベクトルと整数ラベルを持っているからです。ここでTrainingDFのデータフレームは、次のとおりです。私が述べたエラーを得たしかし

+------------+-----------+-------------+-----+ 
|time_stamp_0|sender_ip_1|receiver_ip_2|count| 
+------------+-----------+-------------+-----+ 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.3|  10.0.0.2| 10| 
+------------+-----------+-------------+-----+ 

:またここに

+--------------------+-----+ 
|   features|label| 
+--------------------+-----+ 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,10...| 10| 
+--------------------+-----+ 

は私のオリジナルfinal_dfデータフレームです!誰か助けてくれますか? ありがとうございます。

答えて

8

あなたが読んでいるエラーメッセージはかなり良いポインタです。

DataFrameDatasetに変換するときは、DataFrameの行に格納されているものはすべてEncoderである必要があります。プリミティブのようなタイプ(Int秒、String Sなど)のためのエンコーダとcase classesはちょうどあなたのSparkSessionのための暗黙をインポートすることによって提供されているが好き

は次のとおりです。

case class MyData(intField: Int, boolField: Boolean) // e.g. 

val spark: SparkSession = ??? 
val df: DataFrame = ??? 

import spark.implicits._ 

val ds: Dataset[MyData] = df.as[MyData] 

をそれはどちらか動作しない場合あなたがタイプしようとしているタイプがDataFrameにキャストされていないためです。その場合は、Encoderと書く必要があります。詳細はhereとなります(Encoderjava.time.LocalDateTimehereをご覧ください。

+0

死んだリンク[ここ](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-Encoder.html):( –

+0

@joeybaruchレポートのおかげで、私は – stefanobaghino

+0

私は、この回答を書いた時から本に追加されている例を追加する機会を得ました。 – stefanobaghino

関連する問題