2017-11-24 2 views
-1

スカートで私のakkaストリームアプリケーションをテストすると、なぜNullPointerExceptionが発生するのですか?理解できません...私はおそらくAkkaストリームで何かを見逃しました。テストキットとスカラーテストを使用したAkkaストリームテスト

私はこれが私のアプリ

object CdrToMongoReactiveStream extends App { 

    implicit val system = ActorSystem("cdr-data-generator") 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext=materializer.executionContext 
    import RandomCdrJsonProtocol._ 

    val randomCdrThrottledSource : Source[RandomCdr,NotUsed]= Source 
    .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange))) 
    .throttle(throughput,1.second,1,ThrottleMode.shaping) 
    .named("randomCdrThrottledSource") 

    val cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= Flow[RandomCdr] 
    .map((cdr: RandomCdr) => cdr.toJson.toString()) 
    .named("cdrJsonParseFlow") 

    val mongodbBulkSink : Sink[String,NotUsed] = Flow[String] 
    .map((json: String) => Document.parse(json)) 
    .map((doc: Document) => new InsertOneModel[Document](doc)) 
    .grouped(bulkSize) 
    .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒ 
     Source.fromPublisher(collection.bulkWrite(docs.toList.asJava)) 
    } 
    .to(Sink.ignore) 

    val f = randomCdrThrottledSource.via(cdrJsonParseFlow).runWith(mongodbBulkSink) 
} 

そして、私のテストファイル

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers { 

    import RandomCdrJsonProtocol._ 

    "randomCdrThrottledSource" should { 
    "generate RandomCdr elements only" in { 
     val future = CdrToMongoReactiveStream.randomCdrThrottledSource 
     // line 30 in the error 
     .runWith(Sink.head)(CdrToMongoReactiveStream.materializer) 

     val cdr = Await.result(future,10.second) 
     cdr shouldBe a [RandomCdr] 
    } 
    } 
    "cdrJsonParseFlow" should { 
    "parse RandomCdr to correct json format" in { 
     val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0) 
     val (pub,sub) = TestSource.probe[RandomCdr] 
     .via(CdrToMongoReactiveStream.cdrJsonParseFlow) 
     .toMat(TestSink.probe[String])(Keep.both) 
     .run() 

     sub.request(1) 
     pub.sendNext(randomCdr) 
     sub.expectNext() shouldBe equal(randomCdr.toJson.toString()) 
    } 
    } 
} 

、エラーメッセージでScalaの2.12.4とSBT 1.0.3 でscalatestためのコードの共通の構造を使用

java.lang.NullPointerException was thrown. 
java.lang.NullPointerException 
    at CdrToMongoReactiveStreamSpec.$anonfun$new$2(CdrToMongoReactiveStreamSpec.scala:30) 
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) 
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) 
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) 
    at org.scalatest.Transformer.apply(Transformer.scala:22) 
    at org.scalatest.Transformer.apply(Transformer.scala:20) 
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078) 
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196) 
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) 
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881) 
    at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076) 
    at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088) 
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) 
    at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088) 
    at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070) 
    at org.scalatest.WordSpec.runTest(WordSpec.scala:1881) 
    at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147) 
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396) 
    at scala.collection.immutable.List.foreach(List.scala:389) 
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) 
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373) 
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410) 
    at scala.collection.immutable.List.foreach(List.scala:389) 
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) 
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379) 
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) 
    at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147) 
    at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146) 
    at org.scalatest.WordSpec.runTests(WordSpec.scala:1881) 
    at org.scalatest.Suite.run(Suite.scala:1147) 
    at org.scalatest.Suite.run$(Suite.scala:1129) 
    at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881) 
    at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192) 
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521) 
    at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192) 
    at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190) 
    at org.scalatest.WordSpec.run(WordSpec.scala:1881) 
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) 
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340) 
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334) 
    at scala.collection.immutable.List.foreach(List.scala:389) 
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334) 
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031) 
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010) 
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500) 
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010) 
    at org.scalatest.tools.Runner$.run(Runner.scala:850) 
    at org.scalatest.tools.Runner.run(Runner.scala) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) 
+0

は、おそらくライン30上の値のいずれかがnullです。それらを印刷したり、IDEのデバッグツールを使ってどれがどれであるか把握してみましたか?元のファイルのどの行が30行目であるかははっきりしていません。実行してからいくつかの行を削除したように見えるからです。あなたの質問に行を追加してください、または重要でない場合は、行なしでテストを再実行して、有用なスタックトレースを得ることができますか? –

+0

エラーはあなたのコードには意味がありません。ヌルポインタ例外は 'CdrToMongoReactiveStreamSpec.scala:30'に対処しますが、あなたのコードスニペットはわずか25行です。 –

+0

申し訳ありませんが、準備のためにコードを簡略化しました。 30行目はrunwith(...) – vgkowski

答えて

0

ソース、フロー、シンクをメインプログラム

object CdrToMongoReactiveStream { 

    def randomCdrThrottledSource(msisdnLength : Int,timeRange : Int, throughput : Int): Source[RandomCdr,NotUsed]= { 
    Source 
     .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange))) 
     .throttle(throughput,1.second,1,ThrottleMode.shaping) 
     .named("randomCdrThrottledSource") 
    } 

    def cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= { 
    import RandomCdrJsonProtocol._ 

    Flow[RandomCdr] 
     .map((cdr: RandomCdr) => cdr.toJson.toString()) 
     .named("cdrJsonParseFlow") 
    } 

    def mongodbBulkSink(collection : MongoCollection[Document], bulkSize : Int) : Sink[String,NotUsed] = { 

    Flow[String] 
     .map((json: String) => Document.parse(json)) 
     .map((doc: Document) => new InsertOneModel[Document](doc)) 
     .grouped(bulkSize) 
     .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒ 
     Source.fromPublisher(collection.bulkWrite(docs.toList.asJava)) 
     } 
     .to(Sink.ignore) 
    } 

    def main(args: Array[String]): Unit = { 
    val f = randomCdrThrottledSource(msisdnLength,timeRange,throughput) 
     .via(cdrJsonParseFlow).runWith(mongodbBulkSink(collection,bulkSize)) 

    logger.info("Generated random data") 
    } 
} 

とテストファイル

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers { 

    import CdrToMongoReactiveStream._ 
    import RandomCdrJsonProtocol._ 

    implicit val system = ActorSystem("cdr-data-generator") 
    implicit val materializer = ActorMaterializer() 

    val collection = new Fongo("mongo test server").getDB("cdrDB").getCollection("cdr") 
    val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0) 

    "randomCdrThrottledSource" should { 
    "generate RandomCdr elements only" in { 
     val future = CdrToMongoReactiveStream.randomCdrThrottledSource(8,86400000,1) 
     .runWith(Sink.head) 

     val cdr = Await.result(future,5.second) 
     cdr shouldBe a [RandomCdr] 
    } 
    } 
} 
関連する問題