私は、ストリーム処理用のラムダアーキテクチャシステムを実装しています。Spark Streaming:ストリームにパイプラインを読み込む方法
私は、SparkバッチでGridSearchとのパイプラインを作成する何の問題を持っていない:しかし、私は、Sparkストリーミング処理中にパイプラインを接続する方法を見つけるように見えるカント
pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])
paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=4)
pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")
。私はDSTREAMソースとしてカフカを使用していますが、次のように今のように私のコードは次のとおりです。
pyspark.streamingインポートStreamingContextからimport json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils
は
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1})
model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))
CODE MISSING GOES HERE
ssc.start()
ssc.awaitTermination()
とは、今私は
を行うための何らかの方法を見つける必要がありますドキュメントhere(非常に古くなっているように見えますが)あなたのモデルは、predictのメソッドを実装して、rddオブジェクト上で使用できるように思われます(そしてうまくいけばkafkastreamにあります)
Streamingコンテキストでパイプラインを使用するにはどうすればよいですか?再ロードされたPipelineModelのみが実装されているようですtransform
Streamingコンテキストでバッチモデルを使用する唯一の方法は、純粋なモデルを使用し、パイプラインを使用しないことですか?