1

データフレームの保存に関する問題が発生したとき、Spark 2.0から2.1にコードを移行していました。Spark 2.1でCSVのベクトルフィールドを書き込めません

ここでこのコードは、スパーク2.1.0.cloudera1を用いたスパーク2.0.0

を使用しているとき、私は次のエラーを取得する成功コード

import org.apache.spark.sql.types._ 
import org.apache.spark.ml.linalg.VectorUDT 
val df = spark.createDataFrame(Seq(Tuple1(1))).toDF("values") 
val toSave = new org.apache.spark.ml.feature.VectorAssembler().setInputCols(Array("values")).transform(df) 
toSave.write.csv(path) 

です:

java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type. 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.org$apache$spark$sql$execution$datasources$csv$CSVFileFormat$$verifyType$1(CSVFileFormat.scala:233) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:237) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:237) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.verifySchema(CSVFileFormat.scala:237) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:121) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) 
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:484) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:520) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) 
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:579) 
    ... 50 elided 

はこれですが、私の側だけ?

これはSpark 2.1のクローディラリリースに関連していますか? (彼らのレポから、彼らはspark.sqlと混乱していないようだ)

ありがとう!

+1

これが予想されます。

コードは、おおよそ次のようになります。 CSVソースは複雑なオブジェクトをサポートしていません。例外として、_CSVデータソースはstruct 、値:配列>データ型_をサポートしていません。 – zero323

+0

はい、私は考えましたが、なぜそれがSpark 2.0で動作するのですか? –

+0

2.0では動作しません。ベクトルが文字列に変換された1.xでは 'spark-csv'を使用していました。 – zero323

答えて

2

次の回答は、@ zero323のコメントから構成されています。

CSVソースは複雑なオブジェクトをサポートしていません。例外として、CSVデータソースは構造体、値:配列>データ型をサポートしていません。は予期された動作です。ベクトルが文字列に変換された1.xではspark-csvを使用していましたが、Spark 2.xでは動作しません。

この動作は、次のjira SPARK-16216で正しいです。

+1

Spark 2.0.0で動作することを追加するだけです。 2.0.1以上で動作しなくなります。 –

0

回避策として、forkのVectorDisassemblerクラスを使用するか、またはhereと記載されている解決策を実行してください。

私はVectorDisassemblerを使用して、ml.feature.StandardScaler.fitメソッドの結果のデータフレームをCSVに格納しました。

val disassembler = new org.apache.spark.ml.feature.VectorDisassembler() 
val disassembledDF = disassembler.setInputCol("scaledFeatures").transform(df) 
disassembledDF.show() 
関連する問題