0
私はカフカデータをストリーミングし、HDFSから既に構築されたモデルをロードし、カフカメッセージを使用して予測を行います。Pysparkはカフカダイレクトストリームを使用して予測します
私はいくつかの方法を試してみましたが、私は理由はTypeErrorのmodel.predictにこだわっている:データはカフカから受け取ったベクトル
にタイプを変換できませんが分離フロートカンマです。 ... preidctionsのために動作しません型TransformedStreamのある特徴、
lines = directKafkaStream.map(lambda x: x[1])
features = lines.map(lambda data: Vectors.dense([float(c) for c in data.split(',')]))
しかし、この時間:私もこれを試してみました
sc = SparkContext(appName="PythonStreamingKafkaForecast")
ssc = StreamingContext(sc, 10)
# Create stream to get kafka messages
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["my_topic"], {"metadata.broker.list": "kafka_ip"})
features = directKafkaStream.foreachRDD(lambda rdd: rdd.map(lambda s: Vectors.dense(s[1].split(","))))
model = LinearRegressionModel.load(sc, "hdfs://hadoop_ip/model.model")
#Predict
predicted = model.predict(features)
:ここ
は私のコードです私が間違っていることを教えてください。
は問題が話題が空でもカフカからデータを読み取ろうとした、あなたの助けのために