2017-02-06 15 views
1

複数のフィーチャ変換ステージからなるパイプライン(2.0.1)があります。スパーク:OneHotエンコーダとパイプライン(フィーチャ次元の問題)

これらのステージの一部は、OneHotエンコーダです。アイデア:整数ベースのカテゴリをn個の独立したフィーチャに分類する。

パイプラインモデルを訓練し、それを使ってすべての動作をうまく予測するとき。しかし、訓練されたパイプラインモデルを格納して再ロードすると、問題が発生します。

保存された '訓練された' OneHotエンコーダは、そこにいくつのカテゴリがあるかを追跡しません。ロードされたモデルを使用して予測すると、いくつのカテゴリがあるかが再判定され、トレーニングフィーチャスペースと予測フィーチャスペースのサイズ(ディメンション)が異なります。ツェッペリンのノートブックで実行すると、以下のコード例を参照してください。

import org.apache.spark.ml.feature.OneHotEncoder 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.PipelineModel 


// Specifying two test samples, one with class 5 and one with class 3. This is OneHot encoded into 5 boolean features (sparse vector) 
// Adding a 'filler' column because createDataFrame doesnt like single-column sequences and this is the easiest way to demo it ;) 
val df = spark.createDataFrame(Seq((5, 1), (3, 1))).toDF("class", "filler") 

val enc = new OneHotEncoder() 
    .setInputCol("class") 
    .setOutputCol("class_one_hot") 

val pipeline = new Pipeline() 
    .setStages(Array(enc)) 

val model = pipeline.fit(df) 
model.transform(df).show() 

/* 
+-----+------+-------------+ 
|class|filler|class_one_hot| 
+-----+------+-------------+ 
| 5|  1|(5,[],[]) | 
| 3|  1|(5,[3],[1.0])| 
+-----+------+-------------+ 

Note: Vector of size 5 
*/ 

model.write.overwrite().save("s3a://one-hot") 

val loadedModel = PipelineModel.load("s3a://one-hot") 

val df2 = spark.createDataFrame(Seq((3, 1))).toDF("class", "output") // When using the trained model our input consists of one row (prediction engine style). The provided category for the prediction feature set is category 3 
loadedModel.transform(df2).show() 

/* 
+-----+------+-------------+ 
|class|output|class_one_hot| 
+-----+------+-------------+ 
| 3|  1|(3,[],[]) | 
+-----+------+-------------+ 

Note: Incompatible vector of size 3 
*/ 
私がこの連載をサポートし、私自身OneHotエンコーダをしないことを好むだろう

、私は箱から出して使用することができます任意の選択肢があるのですか?

答えて

1

OneHotEncoderを使用することは意図されていません。 OneHotEncoderTransofrmerであり、Estimatorではありません。レベルに関する情報は保存されませんが、出力ディメンションを決定するにはColumnメタデータに依存します。あなたの場合のように、メタデータが欠落している場合は、フォールバック戦略を使用し、レベルがmax(input_column)と仮定します。シリアライゼーションはここでは無関係です。

通常の使用方法では、PipelineTransformersがあり、メタデータが設定されます。 1つの一般的な例はStringIndexerです。 (ニーズがスパーク> = 2.2)Pythonで同様

import org.apache.spark.ml.attribute.NominalAttribute 

val meta = NominalAttribute.defaultAttr 
    .withName("class") 
    .withValues("0", (1 to 5).map(_.toString): _*) 
    .toMetadata 

loadedModel.transform(df2.select($"class".as("class", meta), $"output")) 

手動でメタデータを設定することは可能であるが、それはより複雑である

from pyspark.sql.functions import col 

meta = {"ml_attr": { 
    "vals": [str(x) for x in range(6)], # Provide a set of levels 
    "type": "nominal", 
    "name": "class"}} 

loaded.transform(
    df.withColumn("class", col("class").alias("class", metadata=meta)) 
) 

メタデータも使用して取り付けることができます。いくつかの異なる方法:How to change column metadata in pyspark?

関連する問題