2017-09-26 4 views
0

Akka Streamsを使用しているときにエラー処理が正しく行われないようです。フローがエラーで失敗しましたリアクティブストリーム仕様に違反してシャットダウン

だからこれは私のコード

var db = Database.forConfig("oracle") 
var mysqlDb = Database.forConfig("mysql_read") 
var mysqlDbWrite = Database.forConfig("mysql_write") 

implicit val actorSystem = ActorSystem() 
val decider : Supervision.Decider = { 
    case _: Exception => 
     println("got an exception restarting connections") 
    // let us restart our connections 
    db.close() 
    mysqlDb.close() 
    mysqlDbWrite.close() 
    db = Database.forConfig("oracle") 
    mysqlDb = Database.forConfig("mysql_read") 
    mysqlDbWrite = Database.forConfig("mysql_write") 
    Supervision.Restart 
} 
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) 

であると私はfooがすでにMySQLの中に存在しているのであれば基本的には、レコードがで任意のさらに処理するべきではありません。この

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo => 
    try { 
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long] 
    mysqlDbWrite.run(existsQuery).map(v => (foo, v)) 
    } catch { 
    case e: Throwable => 
     println(s"Lookup failed for ${foo}") 
     throw e // will restart the stream 
    } 
}.collect {case (f, v) if v.isEmpty => f} 

のような流れを持っていますストリーム。

mysql検索で何かが失敗した場合(mysqlマシンはかなり悪く、タイムアウトが一般的です)、レコードは印刷されて破棄され、ストリームは残りのレコードと共に提供されます監督。

私はこのコードを実行します。私はここに私を驚か一つのことは、これらの例外が私のcatchブロックから来ていないということです

[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state 
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.sql.SQLException: Invalid socket timeout value or state 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872) 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketException: Socket is closed 
    at java.net.Socket.setSoTimeout(Socket.java:1137) 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

[error] (mysql_write network timeout executor) java.lang.NullPointerException 
java.lang.NullPointerException 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

ようなエラーを参照してください。キャッチブロックのprintln文が表示されないためです。スタックトレースは私がどこから由来したのかを示していませんが、mysql_writeと言っているので、このフローだけがmysql_writeを使用しているので、上記のFlowと仮定できます。

エラー

[trace] Stack trace suppressed: run last compile:runMain for the full output. 
flow has failed with error Shutting down because of violation of the Reactive Streams specification. 
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting. 
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM 
14:51:07,603 |-INFO in [email protected] - Sleeping for 1 seconds 

と最後にストリーム全体がクラッシュした私は、反応ストリームの仕様に違反していたかわかりません!

答えて

1

より予測可能な解決策を得るための最初の突き刺しは、ブロック動作(Await.result)を削除し、mapAsyncを使用することです。 alreadyExistsFilter流れの書き換えは次のようになります。アッカでのブロッキングの

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(3) { foo ⇒ 
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long] 
    foo → Await.result(mysqlDbWrite.run(existsQuery), Duration.Inf) 
    }.collect{ 
    case (foo, res) if res.isDefined ⇒ foo 
    } 

詳細情報はdocsで見つけることができます。

+0

問題は再び発生します。私は上記の私のポストを更新しています。 –

0

Stefanoからの回答は正しいです。実際には、フロー内のコードをブロックするためにエラーが発生しました。

私の最初のプログラムはscala 2.11を実行していましたが、mapAsyncに切り替えても問題は解決されませんでした。

これはコマンドラインツールなので、scala 2.12に切り替えてやり直すのは簡単でした。

私がScala 2.12で試したところ、完全に機能しました。

私を大きく助けたことの1つは、依存関係に"ch.qos.logback" % "logback-classic" % "1.2.3",を持つことです。これにより、実行中のすべてのSQL文が表示され、何か問題が発生した場合に簡単に確認できます。

関連する問題