2017-02-04 9 views
0

私はカフカデータをストリーミングし、HDFSから既に構築されたモデルをロードし、カフカメッセージを使用して予測を行います。Pysparkはカフカダイレクトストリームを使用して予測します

私はいくつかの方法を試してみましたが、私は理由はTypeErrorのmodel.predictにこだわっている:データはカフカから受け取ったベクトル

にタイプを変換できませんが分離フロートカンマです。 ... preidctionsのために動作しません型TransformedStreamのある特徴、

lines = directKafkaStream.map(lambda x: x[1]) 
features = lines.map(lambda data: Vectors.dense([float(c) for c in data.split(',')])) 

しかし、この時間:私もこれを試してみました

sc = SparkContext(appName="PythonStreamingKafkaForecast") 
ssc = StreamingContext(sc, 10) 

# Create stream to get kafka messages 
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["my_topic"], {"metadata.broker.list": "kafka_ip"}) 

features = directKafkaStream.foreachRDD(lambda rdd: rdd.map(lambda s: Vectors.dense(s[1].split(",")))) 

model = LinearRegressionModel.load(sc, "hdfs://hadoop_ip/model.model") 

#Predict 
predicted = model.predict(features) 

:ここ

は私のコードです

私が間違っていることを教えてください。

は問題が話題が空でもカフカからデータを読み取ろうとした、あなたの助けのために

答えて

0

[OK]をありがとうございます。

これは私の問題を解決しました:

def predict(rdd): 
    count = rdd.count() 
    if (count > 0): 
     features = rdd.map(lambda s: Vectors.dense(s[1].split(","))) 

     return features 
    else: 
    print("No data received") 

directKafkaStream.foreachRDD(lambda rdd: predict(rdd)) 
関連する問題