2016-05-11 14 views
3

私はAkkaストリームを学習しており、演習としてCassandraにログを挿入したいと思います。問題は、ストリームをデータベースに挿入するように管理できないことです。AkkaストリームでCassandraに挿入

私は単純に以下を試した:[

val session = cluster.connect() 

session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};") 

session.execute(s"DROP TABLE IF EXISTS $keyspace.$table;") 

session.execute(s"CREATE TABLE $keyspace.$table (...)") 

val preparedStatement = session.prepare(s"INSERT INTO $keyspace.$table (...) VALUES (...);") 

def saveLog(logEntry: ApacheLog) = { 
    val stmt = preparedStatement.bind(...) 

    session.executeAsync(stmt) 
    } 

アレイからの変換:

object Application extends AkkaApp with LogApacheDao { 

    // The log file is read line by line 
    val source: Source[String, Unit] = Source.fromIterator(() => scala.io.Source.fromFile(filename).getLines()) 

    // Each line is converted to an ApacheLog object 
    val flow: Flow[String, ApacheLog, Unit] = Flow[String] 
    .map(rawLine => { 
     rawLine.split(",") // implicit conversion Array[String] -> ApacheLog 
    }) 

    // Log objects are inserted to Cassandra 
    val sink: Sink[ApacheLog, Future[Unit]] = Sink.foreach[ApacheLog] { log => saveLog(log) } 

    source.via(flow).to(sink).run() 

} 

SAVELOG()は、この(私は明確にコードの列の値を省略)のようLogApacheDaoで定義されています文字列]をApacheLogに送ることは問題なしに起こります(printlnで検証されます)。また、キースペースとテーブルの両方が作成されますが、saveLogに実行が来ると、何かがブロックされていて、挿入されていないようです。

私はすべてのエラーが、カサンドラ・ドライバ・コア(3.0.0)を取得していない私を与え続けて:

Connection[/172.17.0.2:9042-1, inFlight=0, closed=false] was inactive for 30 seconds, sending heartbeat 
Connection[/172.17.0.2:9042-2, inFlight=0, closed=false] heartbeat query succeeded 

私はdockerizedカサンドラを使用することを追加する必要があります。

+1

私はakkaストリームについてしか読んでいないので、それは単なる推測です。あなたは 'saveLog'が例外をスローされていないと確信していますか?私は試してみるか、確かめるために例外の印刷物でそれをキャッチします。これらのcassandraログは、接続が30秒間非アクティブであることを示し、その後、ハートビートが送信されて開いたままになっていることを示します。 –

+0

これはおそらく関連しているhttp://stackoverflow.com/questions/35631754/why-is-akka-streams-swallowing-my-exceptions –

+0

アドバイスのおかげでsaveLogから最終的な例外をキャッチしようとします –

答えて

関連する問題