WindowsのSpark 2.1(スタンドアロン)。 VectorAssembler列を追加した後に、スパークデータフレームをパーケットファイルに保存できません。ベクトル列の前にデータフレームを保存するために何の問題、すべての "機能" がnullではないん (NVLを使用)SparkException:ベクトル列を追加した後、アセンブルする値をNULLにすることはできません。
val conf = new SparkConf().setAppName("RandomForestModelML").setMaster("local")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val df = sparkSession.read
.option("header", true)
.option("delimiter", ";")
.csv("C:\\tmp\\file2.csv")
.createOrReplaceTempView("features")
var data = sparkSession.sql("select cast(NVL(c9003,0) as int) as c9003.. from features")
data = data.withColumnRenamed("target", "label")
val ignored = List("label", "c0025", "c9052", "c0019")
val featureAssembler = new VectorAssembler()
.setInputCols(data.columns.diff(ignored))
.setOutputCol("features")
data = featureAssembler.transform(data)
data.printSchema()
data.show(5)
data.write.format("parquet").save("C:\\tmp\\features.parquet")
printSchemaとshow(5)用の出力:
root
|-- c9003: integer (nullable = true)
|-- c0022: integer (nullable = true)
|-- c9014: integer (nullable = true)
|-- c9008: integer (nullable = true)
|-- a8401: integer (nullable = true)
|-- c0021: double (nullable = true)
|-- c0025: string (nullable = true)
|-- d1417: integer (nullable = true)
|-- d0006: integer (nullable = true)
|-- c9052: string (nullable = true)
|-- c0023: integer (nullable = true)
|-- d1501: integer (nullable = true)
|-- c0020: integer (nullable = true)
|-- d0007: integer (nullable = true)
|-- c0024: integer (nullable = true)
|-- c4018: integer (nullable = true)
|-- at180: integer (nullable = true)
|-- c1421: integer (nullable = true)
|-- label: integer (nullable = true)
|-- features: vector (nullable = true)
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+
|c9003|c0022|c9014|c9008|a8401|c0021| c0025|d1417|d0006|c9052|c0023|d1501|c0020|d0007|c0024|c4018|at180|c1421|label| features|
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+
| 10| 1| 4| 53| 0| 0.0|FHB KERESKEDELMI ...| 0| 0| P| 2| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,8,10...|
| 10| 1| 3| 69| 0| 0.01| MKB BANK ZRT| 1| 0| P| 0| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,5,6,...|
| 100| 2| 4| 57| 0| 0.24|SANTANDER CONSUME...| 1| 18| P| 2| 1| 1| 0| 0| 0| 0| 1| 0|[100.0,2.0,4.0,57...|
| 100| 2| 5| 55| 0| 0.09|SANTANDER CONSUME...| 0| 0| P| 0| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,5,10...|
| 10| 3| 2| 65| 4| 0.23|RAIFFEISEN BANK ZRT.| 2| 14| P| 0| 2| 1| 0| 0| 0| 0| 1| 0|[10.0,3.0,2.0,65....|
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+
only showing top 5 rows
と例外:
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<c9003_double_vecAssembler_41f4486b7bab:double,c0022_double_vecAssembler_41f4486b7bab:double,c9014_double_vecAssembler_41f4486b7bab:double,c9008_double_vecAssembler_41f4486b7bab:double,a8401_double_vecAssembler_41f4486b7bab:double,c0021:double,d1417_double_vecAssembler_41f4486b7bab:double,d0006_double_vecAssembler_41f4486b7bab:double,c0023_double_vecAssembler_41f4486b7bab:double,d1501_double_vecAssembler_41f4486b7bab:double,c0020_double_vecAssembler_41f4486b7bab:double,d0007_double_vecAssembler_41f4486b7bab:double,c0024_double_vecAssembler_41f4486b7bab:double,c4018_double_vecAssembler_41f4486b7bab:double,at180_double_vecAssembler_41f4486b7bab:double,c1421_double_vecAssembler_41f4486b7bab:double>) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
... 16 more
更新日: データに問題があるように見えますが、どこを理解できません。私は "ラベル"と "c9003"を除くすべての列を削除しようとしましたが、正常に動作しています。他の列では正常に動作していますが、c9014で例外が発生します。
root
|-- label: string (nullable = false)
|-- c9003: integer (nullable = true)
|-- c9014: integer (nullable = true)
|-- features: vector (nullable = true)
アップデート2:今、私は、ベクトル列を追加した後のデータフレーム
var data = sparkSession.sql("select NVL(target,0) as target, cast(NVL(c9003,0) as int) as c9003, cast(NVL(c9014,0) as int) as c9014 from features where c9014 is not null")
data.show(20)
+------+-----+-----+
|target|c9003|c9014|
+------+-----+-----+
| 0| 10| 4|
| 0| 10| 3|
| 0| 100| 4|
| 0| 100| 5|
| 0| 10| 2|
| 0| 10| 6|
| 0| 10| 2|
| 0| 90| 4|
| 0| 80| 4|
| 0| 80| 5|
| 0| 10| 2|
| 0| 90| 8|
| 0| 90| 8|
| 0| 90| 8|
| 0| 90| 4|
| 0| 80| 5|
| 0| 80| 2|
| 0| 80| 2|
| 0| 90| 7|
| 0| 90| 8|
+------+-----+-----+
only showing top 20 rows
ため、このクエリをしようとしているメモリ/データ量に問題のように見えます。私はSQLでフィルタを追加しようとしました:
キャスト(NVL intとして(c9014,9999))> 1000年 - として>それはOK取り組んでいる
キャスト(NVL(c9014,9999) INT)= 1000 < - >それはc9014でOK
ノーフィルターを働いている - >メモリ・チューニング上の任意のヒント例外
を上げますか?
あなたはいくつかの行のデータセットを共有できますか? – BDR
メインポストを更新しました。列c9014の問題のようです – Triffids