2016-04-26 3 views
2

私はTestSuiteBaseを使用して、spark-streaming(スパークストリーミングコンテキストsccを使用)でいくつかのテストを作成しています。次に、output: Seq[Seq[(Double, Double)]]を使用してダミーデータを作成します。最後に、outputにいくつかの関数を適用したいが、この関数はSeq[Seq[(Double, Double)]]ではなくRDD[(Double, Double)]を受け付ける。スパークストリーミングコンテキストで動作するときにSeqをRDDに変換する方法

この問題を解決するには、val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten)を使用することを考えていますが、どのようにして正確にスパークのコンテキストを取得する必要がありますかscsccから?または、を使わずに直接RDDにダミーデータを直接作成する方法はありますか?

class StreamingTestLR extends SparkFunSuite 
         with TestSuiteBase { 

    // use longer wait time to ensure job completion 
    override def maxWaitTimeMillis: Int = 20000 

    var ssc: StreamingContext = _ 

    override def afterFunction() { 
    super.afterFunction() 
    if (ssc != null) { 
     ssc.stop() 
    } 
    } 

//... 

val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) 

// THE PROBLEM IS HERE!!! 
// val metrics = new SomeFuncThatAcceptsRDD(rdd) 

} 

UPDATE

// Test if the prediction accuracy of increases when using hyper-parameter optimization 
    // in order to learn Y = 10*X1 + 10*X2 on streaming data 
    test("Test 1") { 
    // create model initialized with zero weights 
    val model = new StreamingLinearRegressionWithSGD() 
     .setInitialWeights(Vectors.dense(0.0, 0.0)) 
     .setStepSize(0.2) 
     .setNumIterations(25) 

    // generate sequence of simulated data for testing 
    val numBatches = 10 
    val nPoints = 100 
    val testInput = (0 until numBatches).map { i => 
     LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) 
    } 
    val inputDStream = DStream[LabeledPoint] 

    withStreamingContext(setupStreams(testInput, inputDStream)) { ssc => 
     model.trainOn(inputDStream) 
     model.predictOnValues(inputDStream.map(x => (x.label, x.features))) 
     val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) 


     val rdd: RDD[(Double, Double)] = ssc.sparkContext.parallelize(output.flatten) 

     // Instantiate metrics object 
     val metrics = new RegressionMetrics(rdd) 

     // Squared error 
     println(s"MSE = ${metrics.meanSquaredError}") 
     println(s"RMSE = ${metrics.rootMeanSquaredError}") 

     // R-squared 
     println(s"R-squared = ${metrics.r2}") 

     // Mean absolute error 
     println(s"MAE = ${metrics.meanAbsoluteError}") 

     // Explained variance 
     println(s"Explained variance = ${metrics.explainedVariance}") 
    } 
    } 

答えて

3

これを試してみてください:ここ

class MyTestSuite extends TestSuiteBase with BeforeAndAfter { 

    test("my test") { 
    withTestServer(new TestServer()) { testServer => 
     // Start the server 
     testServer.start() 
     // Set up the streaming context and input streams 
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => 
     val rdd = ssc.sparkContext.parallelize(output.flatten) 
     // your code here 
     testServer.stop() 
     ssc.stop() 
     } 
    } 
    } 
} 

詳細:https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

+0

それは言う: '停止SparkContext java.lang.IllegalStateExceptionのメソッドを呼び出すことはできません:停止したSparkContextのメソッドを呼び出せません ' – Klue

+0

可能完全なコード例であなたの質問を編集する? withStreamingContext {ssc =>}の内部にテストコードを実行しようとしましたか? –

+0

TestServerはどこで定義されていますか? – Klue

関連する問題