2016-04-27 20 views
1

ストリーミングデータに対して線形回帰モデルを訓練する必要があります。私はtextFileStreamを使ってストリーミングデータを読んだ。しかし、問題は、RegressionMetricsRDD[(Double, Double)]を受け付け、outputDStream[Double,Double]の形式であることです。 outputRDD[(Double, Double)]に変換する方法をRegressionMetricsにできますか?DStream [Double、Double]をRDDに変換する(Double、Double)

val model = new StreamingLinearRegressionWithSGD() 
     .setInitialWeights(Vectors.dense(0.0, 0.0)) 
     .setStepSize(0.2) 
     .setNumIterations(25) 

    trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse) 
testData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse) 

model.trainOn(trainingData) 

val output = model.predictOnValues(testData.map(lp => (lp.label, lp.features))) 

val metrics = new RegressionMetrics(output) 
val rmse = metrics.rootMeanSquaredError 
+0

は、あなたが出力を定義しましたの? – eliasah

+0

@eliasah:Uups、申し訳ありませんが、コードを更新しました。 – Klue

答えて

0

すべてのDSTREAMはforeachRDDメソッドを使用してアクセスすることができる基本的なRDD(すべてのデータのバッチのための独立した1)、から構成されています

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).foreachRDD { rdd => 
    val metrics = new RegressionMetrics(rdd) 
    val rmse = metrics.rootMeanSquaredError 
    // do something with `rmse` here 
} 
関連する問題