2016-10-06 3 views
0

私のテストフレームワークにSpark-MongoDBコネクタを設定しようとしています。私はこのようなDSTREAMを設定しようとするたびにスパークMongodbコネクタユニットテスト

val conf = new SparkConf() 
      .setMaster("local[*]") 
      .setAppName("test") 
      .set("spark.mongodb.input.uri", "mongodb://localhost:27017/testdb.testread") 
      .set("spark.mongodb.output.uri", "mongodb://localhost:27017/testdb.testwrite") 

lazy val ssc = new StreamingContext(conf, Seconds(1))

:私のStreamingContextは、このように設定されている

val records = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq))

私は、このエラー

で打撃を受けます

java.lang.IllegalStateException:停止したSparkCのメソッドを呼び出せません文章

コンテキストが起動してすぐに停止しているように見えますが、理由がわかりません。ログにエラーはありません。

DEBUG] 2016年10月6日18:29: - クイック@ 4858ms osjsServletContextHandler @ 33b85bc {51625 org.spark_project.jetty.util.component.AbstractLifeCycle setStartedそれは起動完了後、直ちに停止する場所です/ metrics/json、null、AVAILABLE} [WARN] 2016-10-06 18:29:51,660 org.apache.spark.streaming.StreamingContext logWarning - StreamingContextはまだ開始されていません [DEBUG] 2016-10-06 18 :29:51,662 org.spark_project.jetty.util.component.AbstractLifeCycle setStopping - 停止中[email protected] [DEBUG] 2016-10-06 18:29:51,664 org.spark_project.jetty.server .Server doStop - 正常なシャットダウン[email protected] by

私はそれがシャットダウンしないとすべてがうまくある設定MongoDBの接続を取り外します(私は:(モンゴする書き込み/読み込みができませ除く)

EDIT: これは私がしようとテストですmongoに書き込む。しかし、この時点までに私のテストスイートは失敗します。

"read from kafka queue" in new SparkScope{ 

    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](List("topic"), 
     Map[String, Object](
     "bootstrap.servers"->s"localhost:${kServer.kafkaPort}", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "testing", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    ) 
) 
    val writeConfig = WriteConfig(Map(
    "collection"->"testcollection", 
    "writeConcern.w"->"majority", 
    "db"->"testdb" 
), Some(WriteConfig(ssc.sparkContext))) 

    stream.map(r => (r.key.toLong, r.value.toLong)) 
    .reduceByKey(_+_) 
    .map{case (k,v) => { 
     val d = new Document() 
     d.put("key", k) 
     d.put("value", v) 
     d 
    }} 
    .foreachRDD(rdd => rdd.saveToMongoDB(writeConfig)) 

    ssc.start 
    (1 until 10).foreach(x => producer.send(KafkaProducerRecord("topic", "1", "1"))) 
    ssc.awaitTerminationOrTimeout(1500) 
    ok 
} 

私はScalaのコレクションからのストリームを作成しようとすると失敗がここで発生するには:

"return a single record with the correct sum" in new SparkScope{ 
    val stream = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq)) 
    val m = HashMap.empty[Long,Long] 
    FlattenTimeSeries.flatten(stream).foreachRDD(rdd => m ++= rdd.collect()) 
    ssc.start() 
    ssc.awaitTerminationOrTimeout(1500) 
    m.size === 1 and m(1) === 20 
    } 

SparkScopeクラスはちょうど私が上記示したStreamingContextを作成し、試験後ssc.stop()を呼び出し

+0

非常に奇妙です - 例では、あなたはMongoで何もしていません - それを拡張できますか? – Ross

答えて

1

問題は、SparkConf変数がlazyと宣言されていないが、StreamingContextが宣言されていないことです。なぜそれが重要なのか分かりませんが、それはありません。一定。

+0

こんにちはuser1748268、私はMongoDBにデータを保存しようとしていますが、まだ成功していません。私は見て継続することができるように実行中のプロジェクトの単純化されたフォームを共有してください。事前に感謝、乾杯:) –

+0

こんにちは@DynamicRemo。私が上に掲示したコードは基本的に完全であり、実際には完全に機能します。私が取り組んでいた問題はspecs2関連でした(私のスコープは、特性の代わりに抽象クラスでした)。あなたはどんな特定の問題を抱えていますか?多分私は助けることができる – Tim