2016-09-29 14 views
0

私は、ストリーム処理用のラムダアーキテクチャシステムを実装しています。Spark Streaming:ストリームにパイプラインを読み込む方法

私は、SparkバッチでGridSearchとのパイプラインを作成する何の問題を持っていない:しかし、私は、Sparkストリーミング処理中にパイプラインを接続する方法を見つけるように見えるカント

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor]) 

paramGrid = (
ParamGridBuilder() 
.addGrid(logistic_regressor.regParam, (0.01, 0.1)) 
.addGrid(logistic_regressor.tol, (1e-5, 1e-6)) 
...etcetera 
).build() 

cv = CrossValidator(estimator=pipeline, 
       estimatorParamMaps=paramGrid, 
       evaluator=BinaryClassificationEvaluator(), 
       numFolds=4) 

pipeline_cv = cv.fit(raw_train_df) 
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df) 
model_fitted.write().overwrite().save("pipeline") 

。私はDSTREAMソースとしてカフカを使用していますが、次のように今のように私のコードは次のとおりです。

pyspark.streamingインポートStreamingContextから
import json 
from pyspark.ml import PipelineModel 
from pyspark.streaming.kafka import KafkaUtils 

ssc = StreamingContext(sc, 1) 
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1}) 

model = PipelineModel.load('pipeline/') 
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1])) 

CODE MISSING GOES HERE  

ssc.start() 
ssc.awaitTermination() 

とは、今私は

を行うための何らかの方法を見つける必要があります

ドキュメントhere(非常に古くなっているように見えますが)あなたのモデルは、predictのメソッドを実装して、rddオブジェクト上で使用できるように思われます(そしてうまくいけばkafkastreamにあります)

Streamingコンテキストでパイプラインを使用するにはどうすればよいですか?再ロードされたPipelineModelのみが実装されているようですtransform

Streamingコンテキストでバッチモデルを使用する唯一の方法は、純粋なモデルを使用し、パイプラインを使用しないことですか?

答えて

1

Spark Pipelineをスパークストリーミングにロードする方法が見つかりました。

この解決策は、Spark 2.0で動作します。これは、今後のバージョンではより良い解決策が実現する可能性があるためです。

私が見つけた解決策は、toDF()メソッドを使用してストリーミングrddをデータフレームに変換します。ここでpipeline.transformメソッドを適用できます。

このやっていることはひどく非効率です。

# we load the required libraries 
from pyspark.sql.types import (
     StructType, StringType, StructField, LongType 
     ) 
from pyspark.sql import Row 
from pyspark.streaming.kafka import KafkaUtils 

#we specify the dataframes schema, so spark does not have to do reflections on the data. 

pipeline_schema = StructType(
    [ 
     StructField("field1",StringType(),True), 
     StructField("field2",StringType(),True), 
     StructField("field3", LongType(),True) 
] 
) 

#We load the pipeline saved with spark batch 
pipeline = PipelineModel.load('/pipeline') 

#Setup usual spark context, and spark Streaming Context 
sc = spark.sparkContext 
ssc = StreamingContext(sc, 1) 

#On my case I use kafka directKafkaStream as the DStream source 
directKafkaStream = KafkaUtils.createDirectStream(ssc, [QUEUE_NAME], {"metadata.broker.list": "localhost:9092"}) 

def handler(req_rdd): 
    def process_point(p): 
     #here goes the logic to do after applying the pipeline 
     print(p) 
    if req_rdd.count() > 0: 
     #Here is the gist of it, we turn the rdd into a Row, then into a df with the specified schema) 
     req_df = req_rdd.map(lambda r: Row(**r)).toDF(schema=pipeline_schema) 
     #Now we can apply the transform, yaaay 
     pred = pipeline.transform(req_df) 
     records = pred.rdd.map(lambda p: process_point(p)).collect() 

これが役立ちます。

関連する問題