2016-04-27 28 views
0

次のテストを実行すると、「停止したSparkContextでメソッドを呼び出せません」がスローされます。考えられる問題は、TestSuiteBaseとストリーミングスパークコンテキストを使用していることです。ラインでval gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)私はssc.sparkContext経由でアクセスSparkContextを使用する必要があると私は問題(下記の警告やエラーメッセージを参照)停止したSparkContextのメソッドを呼び出せません

class StreamingTest extends TestSuiteBase with BeforeAndAfter { 

test("Test 1") { 
//... 
    val gridEvals = for (initialWeights <- gridParams("initialWeights"); 
         stepSize <- gridParams("stepSize"); 
         numIterations <- gridParams("numIterations")) yield { 
     val lr = new StreamingLinearRegressionWithSGD() 
     .setInitialWeights(initialWeights.asInstanceOf[Vector]) 
     .setStepSize(stepSize.asInstanceOf[Double]) 
     .setNumIterations(numIterations.asInstanceOf[Int]) 

     ssc = setupStreams(inputData, (inputDStream: DStream[LabeledPoint]) => { 
     lr.trainOn(inputDStream) 
     lr.predictOnValues(inputDStream.map(x => (x.label, x.features))) 
     }) 

     val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) 
     val cvRMSE = calculateRMSE(output, nPoints) 
     println(s"RMSE = $cvRMSE") 
     (initialWeights, stepSize, numIterations, cvRMSE) 

    } 

    val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals) 

} 

} 

16/04/27午前10時40分17秒を持っている場所ですWARN StreamingContext:StreamingContextは既に が停止しました16/04/27 10:40:17 INFO SparkContext:SparkContextはすでに が停止しています。

がUPDATE停止SparkContext

をメソッドを呼び出すことはできません:それは以前に起こったいくつかのエラーの結果である

trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { 

    // Name of the framework for Spark context 
    def framework: String = this.getClass.getSimpleName 

    // Master for Spark context 
    def master: String = "local[2]" 

    // Batch duration 
    def batchDuration: Duration = Seconds(1) 

    // Directory where the checkpoint data will be saved 
    lazy val checkpointDir: String = { 
    val dir = Utils.createTempDir() 
    logDebug(s"checkpointDir: $dir") 
    dir.toString 
    } 

    // Number of partitions of the input parallel collections created for testing 
    def numInputPartitions: Int = 2 

    // Maximum time to wait before the test times out 
    def maxWaitTimeMillis: Int = 10000 

    // Whether to use manual clock or not 
    def useManualClock: Boolean = true 

    // Whether to actually wait in real time before changing manual clock 
    def actuallyWait: Boolean = false 

    // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things. 
    val conf = new SparkConf() 
    .setMaster(master) 
    .setAppName(framework) 

    // Timeout for use in ScalaTest `eventually` blocks 
    val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds)) 

    // Default before function for any streaming test suite. Override this 
    // if you want to add your stuff to "before" (i.e., don't call before { }) 
    def beforeFunction() { 
    if (useManualClock) { 
     logInfo("Using manual clock") 
     conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock") 
    } else { 
     logInfo("Using real clock") 
     conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock") 
    } 
    } 

    // Default after function for any streaming test suite. Override this 
    // if you want to add your stuff to "after" (i.e., don't call after { }) 
    def afterFunction() { 
    System.clearProperty("spark.streaming.clock") 
    } 

    before(beforeFunction) 
    after(afterFunction) 

    /** 
    * Run a block of code with the given StreamingContext and automatically 
    * stop the context when the block completes or when an exception is thrown. 
    */ 
    def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): R = { 
    try { 
     block(ssc) 
    } finally { 
     try { 
     ssc.stop(stopSparkContext = true) 
     } catch { 
     case e: Exception => 
      logError("Error stopping StreamingContext", e) 
     } 
    } 
    } 

    /** 
    * Run a block of code with the given TestServer and automatically 
    * stop the server when the block completes or when an exception is thrown. 
    */ 
    def withTestServer[R](testServer: TestServer)(block: TestServer => R): R = { 
    try { 
     block(testServer) 
    } finally { 
     try { 
     testServer.stop() 
     } catch { 
     case e: Exception => 
      logError("Error stopping TestServer", e) 
     } 
    } 
    } 

    /** 
    * Set up required DStreams to test the DStream operation using the two sequences 
    * of input collections. 
    */ 
    def setupStreams[U: ClassTag, V: ClassTag](
     input: Seq[Seq[U]], 
     operation: DStream[U] => DStream[V], 
     numPartitions: Int = numInputPartitions 
    ): StreamingContext = { 
    // Create StreamingContext 
    val ssc = new StreamingContext(conf, batchDuration) 
    if (checkpointDir != null) { 
     ssc.checkpoint(checkpointDir) 
    } 

    // Setup the stream computation 
    val inputStream = new TestInputStream(ssc, input, numPartitions) 
    val operatedStream = operation(inputStream) 
    val outputStream = new TestOutputStreamWithPartitions(operatedStream, 
     new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) 
    outputStream.register() 
    ssc 
    } 

    /** 
    * Set up required DStreams to test the binary operation using the sequence 
    * of input collections. 
    */ 
    def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
     input1: Seq[Seq[U]], 
     input2: Seq[Seq[V]], 
     operation: (DStream[U], DStream[V]) => DStream[W] 
    ): StreamingContext = { 
    // Create StreamingContext 
    val ssc = new StreamingContext(conf, batchDuration) 
    if (checkpointDir != null) { 
     ssc.checkpoint(checkpointDir) 
    } 

    // Setup the stream computation 
    val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) 
    val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) 
    val operatedStream = operation(inputStream1, inputStream2) 
    val outputStream = new TestOutputStreamWithPartitions(operatedStream, 
     new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) 
    outputStream.register() 
    ssc 
    } 

    /** 
    * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and 
    * returns the collected output. It will wait until `numExpectedOutput` number of 
    * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. 
    * 
    * Returns a sequence of items for each RDD. 
    */ 
    def runStreams[V: ClassTag](
     ssc: StreamingContext, 
     numBatches: Int, 
     numExpectedOutput: Int 
    ): Seq[Seq[V]] = { 
    // Flatten each RDD into a single Seq 
    runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq) 
    } 

    /** 
    * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and 
    * returns the collected output. It will wait until `numExpectedOutput` number of 
    * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. 
    * 
    * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each 
    * representing one partition. 
    */ 
    def runStreamsWithPartitions[V: ClassTag](
     ssc: StreamingContext, 
     numBatches: Int, 
     numExpectedOutput: Int 
    ): Seq[Seq[Seq[V]]] = { 
    assert(numBatches > 0, "Number of batches to run stream computation is zero") 
    assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") 
    logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) 

    // Get the output buffer 
    val outputStream = ssc.graph.getOutputStreams. 
     filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]). 
     head.asInstanceOf[TestOutputStreamWithPartitions[V]] 
    val output = outputStream.output 

    try { 
     // Start computation 
     ssc.start() 

     // Advance manual clock 
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] 
     logInfo("Manual clock before advancing = " + clock.getTimeMillis()) 
     if (actuallyWait) { 
     for (i <- 1 to numBatches) { 
      logInfo("Actually waiting for " + batchDuration) 
      clock.advance(batchDuration.milliseconds) 
      Thread.sleep(batchDuration.milliseconds) 
     } 
     } else { 
     clock.advance(numBatches * batchDuration.milliseconds) 
     } 
     logInfo("Manual clock after advancing = " + clock.getTimeMillis()) 

     // Wait until expected number of output items have been generated 
     val startTime = System.currentTimeMillis() 
     while (output.size < numExpectedOutput && 
     System.currentTimeMillis() - startTime < maxWaitTimeMillis) { 
     logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) 
     ssc.awaitTerminationOrTimeout(50) 
     } 
     val timeTaken = System.currentTimeMillis() - startTime 
     logInfo("Output generated in " + timeTaken + " milliseconds") 
     output.foreach(x => logInfo("[" + x.mkString(",") + "]")) 
     assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") 
     assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") 

     Thread.sleep(100) // Give some time for the forgetting old RDDs to complete 
    } finally { 
     ssc.stop(stopSparkContext = true) 
    } 
    output 
    } 

    /** 
    * Verify whether the output values after running a DStream operation 
    * is same as the expected output values, by comparing the output 
    * collections either as lists (order matters) or sets (order does not matter) 
    */ 
    def verifyOutput[V: ClassTag](
     output: Seq[Seq[V]], 
     expectedOutput: Seq[Seq[V]], 
     useSet: Boolean 
    ) { 
    logInfo("--------------------------------") 
    logInfo("output.size = " + output.size) 
    logInfo("output") 
    output.foreach(x => logInfo("[" + x.mkString(",") + "]")) 
    logInfo("expected output.size = " + expectedOutput.size) 
    logInfo("expected output") 
    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) 
    logInfo("--------------------------------") 

    // Match the output with the expected output 
    for (i <- 0 until output.size) { 
     if (useSet) { 
     assert(
      output(i).toSet === expectedOutput(i).toSet, 
      s"Set comparison failed\n" + 
      s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + 
      s"Generated output (${output.size} items): ${output.mkString("\n")}" 
     ) 
     } else { 
     assert(
      output(i).toList === expectedOutput(i).toList, 
      s"Ordered list comparison failed\n" + 
      s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + 
      s"Generated output (${output.size} items): ${output.mkString("\n")}" 
     ) 
     } 
    } 
    logInfo("Output verified successfully") 
    } 

    /** 
    * Test unary DStream operation with a list of inputs, with number of 
    * batches to run same as the number of expected output values 
    */ 
    def testOperation[U: ClassTag, V: ClassTag](
     input: Seq[Seq[U]], 
     operation: DStream[U] => DStream[V], 
     expectedOutput: Seq[Seq[V]], 
     useSet: Boolean = false 
    ) { 
    testOperation[U, V](input, operation, expectedOutput, -1, useSet) 
    } 

    /** 
    * Test unary DStream operation with a list of inputs 
    * @param input  Sequence of input collections 
    * @param operation Binary DStream operation to be applied to the 2 inputs 
    * @param expectedOutput Sequence of expected output collections 
    * @param numBatches Number of batches to run the operation for 
    * @param useSet  Compare the output values with the expected output values 
    *     as sets (order matters) or as lists (order does not matter) 
    */ 
    def testOperation[U: ClassTag, V: ClassTag](
     input: Seq[Seq[U]], 
     operation: DStream[U] => DStream[V], 
     expectedOutput: Seq[Seq[V]], 
     numBatches: Int, 
     useSet: Boolean 
    ) { 
    val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size 
    withStreamingContext(setupStreams[U, V](input, operation)) { ssc => 
     val output = runStreams[V](ssc, numBatches_, expectedOutput.size) 
     verifyOutput[V](output, expectedOutput, useSet) 
    } 
    } 

    /** 
    * Test binary DStream operation with two lists of inputs, with number of 
    * batches to run same as the number of expected output values 
    */ 
    def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
     input1: Seq[Seq[U]], 
     input2: Seq[Seq[V]], 
     operation: (DStream[U], DStream[V]) => DStream[W], 
     expectedOutput: Seq[Seq[W]], 
     useSet: Boolean 
    ) { 
    testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) 
    } 

    /** 
    * Test binary DStream operation with two lists of inputs 
    * @param input1  First sequence of input collections 
    * @param input2  Second sequence of input collections 
    * @param operation Binary DStream operation to be applied to the 2 inputs 
    * @param expectedOutput Sequence of expected output collections 
    * @param numBatches Number of batches to run the operation for 
    * @param useSet  Compare the output values with the expected output values 
    *     as sets (order matters) or as lists (order does not matter) 
    */ 
    def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
     input1: Seq[Seq[U]], 
     input2: Seq[Seq[V]], 
     operation: (DStream[U], DStream[V]) => DStream[W], 
     expectedOutput: Seq[Seq[W]], 
     numBatches: Int, 
     useSet: Boolean 
    ) { 
    val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size 
    withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc => 
     val output = runStreams[W](ssc, numBatches_, expectedOutput.size) 
     verifyOutput[W](output, expectedOutput, useSet) 
    } 
    } 
} 

答えて

1

Cannot call methods on a stopped SparkContext

これは、基本クラスTestSuiteBaseです。ログは$SPARK_HOME$/logs$SPARK_HOME$/workです。

+0

私は、Sparkのホームディレクトリにログを見つけることができません。 Loggingでコードを実行する必要はありますか? – Klue

+0

あなたはこれを見つけましたか?16/04/27 10:40:17 WARN StreamingContext ....?ちょうど上になります(ターミナルウィンドウによって制限された場合はファイルにリダイレクトできます) – Hlib

+0

私はIntellijIDEAを使用しています。ログをファイルに保存する方法は?ターミナル(>ログ)からプログラムを実行するとかなり明らかですが、IntellijIDEAの場合はどうしたらいいですか?初心者の質問に申し訳ありません... – Klue

3

これらはあなたがチェックすべきいくつかのことです -

  1. あなたが停止のための検索を行う火花設定に

  2. を指定していることを、利用可能なリソース()キーワードを持っているかどうかを確認しますあなたのコードベースでは、sparkcontext上にあるべきではないことを確認してください。

  3. SparkにはSpark-UIコンポーネントがあります。それがなぜ失敗するのかがわかります。

+0

ポイント2が問題の正しい理由だと思います。私は 'TestSuiteBase'(私の更新を見てください)をチェックし、StreamingContextだけを指定します。 'sc.stop()'を追加しようとしましたが、関数 'runStreams(...)'のパラメータとして 'sc'を渡す必要があり、これが正しい方法であるかどうかはわかりません。この点についてあなたの答えをちょっと詳しく説明してください。 – Klue

+0

あなたの更新されたTestSuiteBaseに基づいて、私はちょうどそれが停止と言う場所をコメントすることによってこれをテストすることを提案します。それがうまくいく場合は、1つずつ実行します。どちらが問題か分かります。コードの残りは私によく見えます。 – tesnik03

関連する問題