2017-08-21 5 views
1

私はPySparkで機械学習をしていて、RandomForestClassifierを使っています。私は今までSklearnを使いました。私はCrossValidatorを使用してパラメータを調整し、最良のモデルを取得しています。 SparkのWebサイトから取得したサンプルコードは以下の通りです。PySparkのCrossValidatorは実行を分散しますか?

私が読んでいるところでは、スパークがパラメータ調整を分配するかどうか、またはSklearnのGridSearchCVの場合と同じかどうかはわかりません。

本当にありがとうございます。

from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
from pyspark.ml.feature import HashingTF, Tokenizer 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 

# Prepare training documents, which are labeled. 
training = spark.createDataFrame([ 
    (0, "a b c d e spark", 1.0), 
    (1, "b d", 0.0), 
    (2, "spark f g h", 1.0), 
    (3, "hadoop mapreduce", 0.0), 
    (4, "b spark who", 1.0), 
    (5, "g d a y", 0.0), 
    (6, "spark fly", 1.0), 
    (7, "was mapreduce", 0.0), 
    (8, "e spark program", 1.0), 
    (9, "a e c l", 0.0), 
    (10, "spark compile", 1.0), 
    (11, "hadoop software", 0.0) 
], ["id", "text", "label"]) 

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. 
tokenizer = Tokenizer(inputCol="text", outputCol="words") 
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") 
lr = LogisticRegression(maxIter=10) 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) 

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. 
# This will allow us to jointly choose parameters for all Pipeline stages. 
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. 
# We use a ParamGridBuilder to construct a grid of parameters to search over. 
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, 
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. 
paramGrid = ParamGridBuilder() \ 
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \ 
    .addGrid(lr.regParam, [0.1, 0.01]) \ 
    .build() 

crossval = CrossValidator(estimator=pipeline, 
          estimatorParamMaps=paramGrid, 
          evaluator=BinaryClassificationEvaluator(), 
          numFolds=2) # use 3+ folds in practice 

# Run cross-validation, and choose the best set of parameters. 
cvModel = crossval.fit(training) 
+0

ヒントが見つからない場合や質問が明確でない場合は、 – nEO

答えて

2

これはありません。クロス検証は、プレーンnested for loopとして実装されています。トレーニング個々のモデルの

for i in range(nFolds): 
    ... 
    for j in range(numModels): 
     ... 

だけプロセスが配布されます。

0

答えが見つかりました。他のユーザーが回答したように、プロセスはパラレル化されていません。これはシリアル操作です。しかし、このグリッド検索には使用できるspark_sklearnモジュールがありますが、それを配布してもモデルビルドは配布されません。それはトレードオフです。

はここspark_sklearn GridSearchCV

%pyspark 

""" 
DATA - https://kdd.ics.uci.edu/databases/20newsgroups/mini_newsgroups.tar.gz 
METHOD 1 - USING GRIDSEARCH CV FROM SPARK_SKLEARN MODULE BY DATABRICKS 
DOCUMENTATION - https://databricks.com/blog/2016/02/08/auto-scaling-scikit-learn-with-apache-spark.html 
THIS IS DISTRIBUTED OPERATION AS MENTIONED ON THE WEBSITE 
""" 
from spark_sklearn import GridSearchCV 
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer 
from pyspark.ml import Pipeline 
from pyspark.sql.types import StructField, StringType, StructType 
from pyspark.ml.feature import IndexToString, StringIndexer 
from spark_sklearn.converter import Converter 
from sklearn.pipeline import Pipeline as S_Pipeline 
from sklearn.ensemble import RandomForestClassifier as S_RandomForestClassifier 

path = 's3://sparkzepellin/mini_newsgroups//*' 
news = sc.wholeTextFiles(path) 
print "Toal number of documents = ",news.count() 

# print 5 samples 
news.takeSample(False,5, 1) 

# Using sqlContext createa dataframe 
schema = ["id", "text", "topic"] 
fields = [StructField(field_name, StringType(), True) for field in schema] 
schema = StructType(fields) 

# Applying the schema decalred above as an RDD 
newsgroups = news.map(lambda (localPath, text): (localPath.split("/")[-1], text, localPath.split("/")[-2])) 
df = sqlContext.createDataFrame(newsgroups, schema) 

df_new = StringIndexer(inputCol="topic", outputCol="label").fit(df).transform(df) 

# Build a pipeline with tokenier, hashing TF, IDF, and finally a RandomForest 
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words") 
hashingTF = HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
idf = IDF().setInputCol("rawFeatures").setOutputCol("features") 

pipeline=Pipeline(stages=[tokenizer, hashingTF, idf]) 
data = pipeline.fit(df_new).transform(df_new) 

# Using Converter, convert to pandas dataframe (numpy) 
# to run on distributed sklearn using spark_sklearn package 
converter = Converter(sc) 
new_df = Converter.toPandas(data.select(data.features.alias("text"), "label")) 

# Sklearn pipeline 
s_pipeline = S_Pipeline([ 
      ('rf', S_RandomForestClassifier()) 
     ]) 

# Random parameters 
parameters = { 
    'rf__n_estimators': (10, 20), 
    'rf__max_depth': (2, 10) 
} 

# Run GridSearchCV using the above defined parameters on the pipeline created 
gridSearch = GridSearchCV(sc, s_pipeline, parameters) 
GS = gridSearch.fit(new_df.text.values, new_df.rating.values) 

操作を並列化し、精度としてメトリックなを取り戻すためにマップメソッドを使用することですこれを行うための別の方法を使用してコードです。

+0

にお問い合わせください。この解決策は良くありません。これはCVを並列化しますが、適合性は並列化しません。 2つのプロセスの1つ、つまりラーニングまたはクロスバリデーションをパラレル化できます。 – eliasah

関連する問題