2017-09-12 16 views
0

は私がAlpakkaカサンドラライブラリになぜAkka Streamsアプリケーションは正常に終了しないのですか?

package com.abhi 

import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ClosedShape} 
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource 
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink} 
import com.datastax.driver.core.{Cluster, Row, SimpleStatement} 
import scala.concurrent.Await 
import scala.concurrent.duration._ 

object MyApp extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val actorMaterializer = ActorMaterializer() 
    implicit val session = Cluster 
     .builder 
     .addContactPoints(List("localhost") :_*) 
     .withPort(9042) 
     .withCredentials("foo", "bar") 
     .build 
     .connect("foobar") 
    val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20) 
    val source = CassandraSource(stmt) 
    val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1))) 
    val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2)) 
    val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b => 
     s => 
     import GraphDSL.Implicits._ 
     source.take(10) ~> toFoo ~> s 
     ClosedShape 
    }) 
    // let us run the graph 
    val future = graph.run() 
    import actorSystem.dispatcher 
    future.onComplete{_ => 
     session.close() 
     Await.result(actorSystem.terminate(), Duration.Inf) 
    } 
    Await.result(future, Duration.Inf) 
    System.exit(0) 
} 

case class Foo(col1: Long, col2: Long) 

を使用して、この簡単なアプリケーションを書いたこのアプリケーションは、期待どおりに動作しますが画面上の10行を表示します。

しかし、投稿がハングアップします。 System.exit(0)コールが実行されると、それは

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0" 

を例外がスローされます。しかし、まだアプリケーションが実行を停止しません。それはちょうどハングアップします。このアプリケーションが正常に終了しない理由を私は理解していない

(実際にはそれもでSystem.exit(0)の呼び出しは必要ありません。

このアプリケーションを終了する唯一の方法は、コントロールCを経由しています。

答えて

2

SBTは、独自のJVMインスタンスでコードが実行されますので、これは起こるかもしれない、あなたのSystem.exit意志その後、上記の結果を与え、出口SBTのJVM

あなたが設定してみました:。?fork in run := trueをどこかのsbtビルドで

actorSystem.dispatcherを使用してonCompleteコールバックを実行することをお勧めします(これを使用して、アクターシステム自体の終了を待つため)。あなたの代わりに試みることができる

何か:JVMを使用すると、残された唯一のスレッドがデーモンスレッドのときSystem.exitを呼び出すために必要とせずに終了することを

import actorSystem.dispatcher 
future.onComplete{ _ => 
    session.close() 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf) 

注意(たとえばWhat is Daemon thread in Java?を参照)。

関連する問題