2017-07-28 17 views
1

ml.DecisionTreeClassifierに、BucketizerまたはQuantileDiscretizerメソッドを使用せずにカテゴリフィーチャではなく連続フィーチャでスコアをつける方法を教えてください。ML DecisionTreeClassifier - 連続機能

以下は、MLのDecisionTreeClassifierに連続機能を渡し、ビニング(Buckizer)機能を使用しないコードです。スコアリングセットの大部分はスコアリングされずに無視されます(スパーク2.1は保持をサポートしません)。

from pyspark.mllib.linalg import Vectors 
from pyspark.ml import Pipeline 
from pyspark.sql import Row, SparkSession, SQLContext 
from pyspark.sql.types import StringType, DoubleType 
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder 
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark import SparkConf, SparkContext 
from pyspark.sql.functions import udf 

# Load the training set that is in parquet format into a data frame 
train_df = sqlContext.read.parquet("/data/training_set") 

# convert data types to double 
train_df.withColumn("income", train_df["income"].cast(DoubleType()) 
train_df.withColumn("age", train_df["age"].cast(DoubleType()) 

# StringIndexer - Target 
# First we will StringIndexer to get numeric categorical features 
indexer1 = StringIndexer(inputCol="target", outputCol="target_numeric", handleInvalid="skip") 

############ 
# StringIndexer/OneHotEncoder - age_in_two_year_increments_2nd_individual 
# First we will StringIndexer to get numeric categorical features 
indexer2 = StringIndexer(inputCol="income", outputCol="income_numeric", handleInvalid='skip') 

# Next we change the categorical feature into binarizing via OneHotEncoder 
encoder2 = OneHotEncoder(inputCol="income_numeric", outputCol="income_vector") 
############ 

############ 
# StringIndexer/OneHotEncoder - age_in_two_year_increments_2nd_individual 
# First we will StringIndexer to get numeric categorical features 
indexer3 = StringIndexer(inputCol="age", outputCol="age_numeric", handleInvalid='skip') 

# Next we change the categorical feature into binarizing via OneHotEncoder 
encoder3 = OneHotEncoder(inputCol="age_numeric", outputCol="age_vector") 
############ 

# Create distinct StringIndexer transformers with the outputCol 
# parameter set to be the name of the input column appended 
indexedcols = [ 
"income_vector", 
"age_vector" 
] 

# FEATURES need to be in a Vector which is why this is converted using a VectorAssembler 
# The VectorAssember is going to take as input our index columns and our output will be the features. 
# Create a VectorAssembler transformer to combine all of the indexed 
# categorical features into a vector. Provide the "indexedcols" list 
# created above as the inputCols parameter, and name the outputCol "features". 
va = VectorAssembler(inputCols = indexedcols, outputCol = 'features') 

# Create a DecisionTreeClassifier, setting the label column to your 
# indexed label column ("label_ix") and the features column to the 
# newly created column from the VectorAssembler above ("features"). 
# Store the new StringIndexer transformers, the VectorAssembler, 
# as well as the DecisionTreeClassifier in a list called "steps" 
clf = DecisionTreeClassifier(labelCol="target_numeric", impurity="gini", maxBins=32, maxMemoryInMB=1024) 

# Create steps for transform for the ml pipeline 
steps = [indexer1, 
     indexer2, encoder2, 
     indexer3, encoder3, 
     va, clf] 

# Create a ML pipeline named "pl" using the steps list to set the stages parameter 
pl = Pipeline(stages=steps) 

# Run the fit method of the pipeline on the DataFrame 
# model in a new variable called "plmodel" 
plmodel = pl.fit(train_df) 

###################################################################################### 
# Scoring Set 
###################################################################################### 

# Now get the data you want to run the model against 
scoring_df = sqlContext.read.parquet("/data/scoring_set") 

# convert data types to double 
scoring_df.withColumn("income", scoring_df["income"].cast(DoubleType()) 
scoring_df.withColumn("age", scoring_df["age"].cast(DoubleType()) 

# Run the transform method of the pipeline model created above 
# on the "test_df" DataFrame to create a new DataFrame called "predictions" 
# 
# skip past any labels that are not in the training set. If you don't skip then errors will be produced 
#saying unseen label:40 which means the scoring set has a new element that did not exist in the training set for the feature. 
predictions = plmodel.transform(scoring_DF) 

vector_udf1 = udf(lambda vector: float(vector[1])) 
vector_udf0 = udf(lambda vector: float(vector[0])) 

# Save dataframe to hdfs 
outputDF = predictions.select("age", \ 
"income", \ 
"prediction", \ 
vector_udf1("probability").alias("probability0")), \ 
vector_udf1("probability").alias("probability1")).write.format("parquet").mode("overwrite").save("/data/algo_scored") 

答えて

0

連続機能の場合、BucketizerまたはQuantileDiscretizerを使用する必要はありません。カテゴリのフィーチャの場合、StringIndexerとOneHotEncoderを使用してパイプラインに含めることができますが、VectorAssemblerを使用してフィーチャを指定するだけの連続フィーチャの場合、DecisionTreeClassifierはフィーチャを自動的にビンします。

ので、コードは次のようになります。

from pyspark.mllib.linalg import Vectors 
from pyspark.ml import Pipeline 
from pyspark.sql import Row, SparkSession, SQLContext 
from pyspark.sql.types import StringType, DoubleType 
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder 
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark import SparkConf, SparkContext 
from pyspark.sql.functions import udf 

# Load the training set that is in parquet format into a data frame 
train_df = sqlContext.read.parquet("/data/training_set") 

# convert data types to double 
train_df.withColumn("income", train_df["income"].cast(DoubleType()) 
train_df.withColumn("age", train_df["age"].cast(DoubleType()) 

# StringIndexer - Target 
# First we will StringIndexer to get numeric categorical features 
indexer1 = StringIndexer(inputCol="target", outputCol="target_numeric", handleInvalid="skip") 

# Create distinct StringIndexer transformers with the outputCol 
# parameter set to be the name of the input column appended 
indexedcols = [ 
"income", 
"age" 
] 

# FEATURES need to be in a Vector which is why this is converted using a VectorAssembler 
# The VectorAssember is going to take as input our index columns and our output will be the features. 
# Create a VectorAssembler transformer to combine all of the indexed 
# categorical features into a vector. Provide the "indexedcols" list 
# created above as the inputCols parameter, and name the outputCol "features". 
va = VectorAssembler(inputCols = indexedcols, outputCol = 'features') 

# Create a DecisionTreeClassifier, setting the label column to your 
# indexed label column ("label_ix") and the features column to the 
# newly created column from the VectorAssembler above ("features"). 
# Store the new StringIndexer transformers, the VectorAssembler, 
# as well as the DecisionTreeClassifier in a list called "steps" 
clf = DecisionTreeClassifier(labelCol="target_numeric", impurity="gini", maxBins=32, maxMemoryInMB=1024) 

# Create steps for transform for the ml pipeline 
steps = [indexer1, 
     va, clf] 

# Create a ML pipeline named "pl" using the steps list to set the stages parameter 
pl = Pipeline(stages=steps) 

# Run the fit method of the pipeline on the DataFrame 
# model in a new variable called "plmodel" 
plmodel = pl.fit(train_df) 

###################################################################################### 
# Scoring Set 
###################################################################################### 

# Now get the data you want to run the model against 
scoring_df = sqlContext.read.parquet("/data/scoring_set") 

# convert data types to double 
scoring_df.withColumn("income", scoring_df["income"].cast(DoubleType()) 
scoring_df.withColumn("age", scoring_df["age"].cast(DoubleType()) 

# Run the transform method of the pipeline model created above 
# on the "test_df" DataFrame to create a new DataFrame called "predictions" 
# 
# skip past any labels that are not in the training set. If you don't skip then errors will be produced 
#saying unseen label:40 which means the scoring set has a new element that did not exist in the training set for the feature. 
predictions = plmodel.transform(scoring_DF) 

vector_udf1 = udf(lambda vector: float(vector[1])) 
vector_udf0 = udf(lambda vector: float(vector[0])) 

# Save dataframe to hdfs 
outputDF = predictions.select("age", \ 
"income", \ 
"prediction", \ 
vector_udf1("probability").alias("probability0")), \ 
vector_udf1("probability").alias("probability1")).write.format("parquet").mode("overwrite").save("/data/algo_scored") 
関連する問題