2017-02-25 12 views
1

WindowsのSpark 2.1(スタンドアロン)。 VectorAssembler列を追加した後に、スパークデータフレームをパーケットファイルに保存できません。ベクトル列の前にデータフレームを保存するために何の問題、すべての "機能" がnullではないん (NVLを使用)SparkException:ベクトル列を追加した後、アセンブルする値をNULLにすることはできません。

val conf = new SparkConf().setAppName("RandomForestModelML").setMaster("local") 
    val sparkSession = SparkSession.builder().config(conf).getOrCreate() 

    val df = sparkSession.read 
    .option("header", true) 
    .option("delimiter", ";") 
    .csv("C:\\tmp\\file2.csv") 
    .createOrReplaceTempView("features") 

    var data = sparkSession.sql("select cast(NVL(c9003,0) as int) as c9003.. from features") 

    data = data.withColumnRenamed("target", "label") 
    val ignored = List("label", "c0025", "c9052", "c0019") 

    val featureAssembler = new VectorAssembler() 
    .setInputCols(data.columns.diff(ignored)) 
    .setOutputCol("features") 
    data = featureAssembler.transform(data) 
    data.printSchema() 
    data.show(5) 

    data.write.format("parquet").save("C:\\tmp\\features.parquet") 

printSchemaとshow(5)用の出力:

root 
|-- c9003: integer (nullable = true) 
|-- c0022: integer (nullable = true) 
|-- c9014: integer (nullable = true) 
|-- c9008: integer (nullable = true) 
|-- a8401: integer (nullable = true) 
|-- c0021: double (nullable = true) 
|-- c0025: string (nullable = true) 
|-- d1417: integer (nullable = true) 
|-- d0006: integer (nullable = true) 
|-- c9052: string (nullable = true) 
|-- c0023: integer (nullable = true) 
|-- d1501: integer (nullable = true) 
|-- c0020: integer (nullable = true) 
|-- d0007: integer (nullable = true) 
|-- c0024: integer (nullable = true) 
|-- c4018: integer (nullable = true) 
|-- at180: integer (nullable = true) 
|-- c1421: integer (nullable = true) 
|-- label: integer (nullable = true) 
|-- features: vector (nullable = true) 


+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+ 
|c9003|c0022|c9014|c9008|a8401|c0021|    c0025|d1417|d0006|c9052|c0023|d1501|c0020|d0007|c0024|c4018|at180|c1421|label|   features| 
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+ 
| 10| 1| 4| 53| 0| 0.0|FHB KERESKEDELMI ...| 0| 0| P| 2| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,8,10...| 
| 10| 1| 3| 69| 0| 0.01|  MKB BANK ZRT| 1| 0| P| 0| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,5,6,...| 
| 100| 2| 4| 57| 0| 0.24|SANTANDER CONSUME...| 1| 18| P| 2| 1| 1| 0| 0| 0| 0| 1| 0|[100.0,2.0,4.0,57...| 
| 100| 2| 5| 55| 0| 0.09|SANTANDER CONSUME...| 0| 0| P| 0| 0| 1| 0| 0| 0| 0| 1| 0|(16,[0,1,2,3,5,10...| 
| 10| 3| 2| 65| 4| 0.23|RAIFFEISEN BANK ZRT.| 2| 14| P| 0| 2| 1| 0| 0| 0| 0| 1| 0|[10.0,3.0,2.0,65....| 
+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+ 
only showing top 5 rows 

と例外:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<c9003_double_vecAssembler_41f4486b7bab:double,c0022_double_vecAssembler_41f4486b7bab:double,c9014_double_vecAssembler_41f4486b7bab:double,c9008_double_vecAssembler_41f4486b7bab:double,a8401_double_vecAssembler_41f4486b7bab:double,c0021:double,d1417_double_vecAssembler_41f4486b7bab:double,d0006_double_vecAssembler_41f4486b7bab:double,c0023_double_vecAssembler_41f4486b7bab:double,d1501_double_vecAssembler_41f4486b7bab:double,c0020_double_vecAssembler_41f4486b7bab:double,d0007_double_vecAssembler_41f4486b7bab:double,c0024_double_vecAssembler_41f4486b7bab:double,c4018_double_vecAssembler_41f4486b7bab:double,at180_double_vecAssembler_41f4486b7bab:double,c1421_double_vecAssembler_41f4486b7bab:double>) => vector) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) 
    ... 16 more 

更新日: データに問題があるように見えますが、どこを理解できません。私は "ラベル"と "c9003"を除くすべての列を削除しようとしましたが、正常に動作しています。他の列では正常に動作していますが、c9014で例外が発生します。

root 
|-- label: string (nullable = false) 
|-- c9003: integer (nullable = true) 
|-- c9014: integer (nullable = true) 
|-- features: vector (nullable = true) 

アップデート2:今、私は、ベクトル列を追加した後のデータフレーム

var data = sparkSession.sql("select NVL(target,0) as target, cast(NVL(c9003,0) as int) as c9003, cast(NVL(c9014,0) as int) as c9014 from features where c9014 is not null") 
data.show(20) 
+------+-----+-----+ 
|target|c9003|c9014| 
+------+-----+-----+ 
|  0| 10| 4| 
|  0| 10| 3| 
|  0| 100| 4| 
|  0| 100| 5| 
|  0| 10| 2| 
|  0| 10| 6| 
|  0| 10| 2| 
|  0| 90| 4| 
|  0| 80| 4| 
|  0| 80| 5| 
|  0| 10| 2| 
|  0| 90| 8| 
|  0| 90| 8| 
|  0| 90| 8| 
|  0| 90| 4| 
|  0| 80| 5| 
|  0| 80| 2| 
|  0| 80| 2| 
|  0| 90| 7| 
|  0| 90| 8| 
+------+-----+-----+ 
only showing top 20 rows 

ため、このクエリをしようとしているメモリ/データ量に問題のように見えます。私はSQLでフィルタを追加しようとしました:

  1. キャスト(NVL intとして(c9014,9999))> 1000年 - として>それはOK取り組んでいる

  2. キャスト(NVL(c9014,9999) INT)= 1000 < - >それはc9014でOK

  3. ノーフィルターを働いている - >メモリ・チューニング上の任意のヒント例外

を上げますか?

+0

あなたはいくつかの行のデータセットを共有できますか? – BDR

+0

メインポストを更新しました。列c9014の問題のようです – Triffids

答えて

0

問題が解決され、SQLクエリで「c9014としてキャスト(NVL(c9014,0)として)キャスト」 このコードはNULLを生成する可能性があり、NVL()の前にCAST()を使用する必要があります。

関連する問題