2017-11-06 8 views
2

私は、spark 2.2 struct streamingを使ってkafka msgをoracleデータベースに読み込むプロジェクトに入っています。 kafkaへのメッセージの流れは約4000-6000メッセージ/秒です。spark 2.2 struct streaming foreach writer jdbcシンク・ラグ


hdfsファイルシステムをシンク宛先として使用すると、正常に動作します。 foreach jdbc writerを使用すると、時間の経過とともに大きな遅延が発生します。私は遅れがforeachループによって引き起こされると思います。

JDBCシンククラス(単独で、クラスファイルをスタンド):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] { 
    val driver = "oracle.jdbc.driver.OracleDriver" 
    var connection: java.sql.Connection = _ 
    var statement: java.sql.PreparedStatement = _ 
    val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)" 

    def open(partitionId: Long, version: Long): Boolean = { 
    Class.forName(driver) 
    connection = java.sql.DriverManager.getConnection(url, user, pwd) 
    connection.setAutoCommit(false) 
    statement = connection.prepareStatement(v_sql) 
    true 
    } 

    def process(value: org.apache.spark.sql.Row): Unit = { 
    statement.setString(1, value(0).toString) 
    statement.setString(2, value(1).toString) 
    statement.setString(3, value(2).toString) 
    statement.setString(4, value(3).toString) 
    statement.executeUpdate()   
    } 

    def close(errorOrNull: Throwable): Unit = { 
    connection.commit() 
    connection.close 
    } 
} 

シンク部分:

val df = spark.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000") 
    .option("subscribe", "rawdb.raw_data") 
    .option("startingOffsets", "latest") 
    .load() 
    .select($"value".as[Array[Byte]]) 
    .map(avroDeserialize(_)) 
    .filter(some logic).select(some logic) 
    .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start() 

私は最後の行を変更した場合

.writeStream.format("csv")...

次のようにJDBCのforeachシンクに

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))" 
val user = "user"; 
val pwd = "password"; 
val writer = new JDBCSink(url, user, pwd) 
.writeStream.foreach(writer).outputMode("append").start() 

遅れショーアップ。

foreachループメカニックによって発生する可能性が最も高い問題は、バッチモードではなく、Oracle DBAとしても数千行のように扱うことです。アイドルイベントの場合過剰なコミットは既にconnection.setAutoCommit(false)に設定することで回避しようとしていますが、どんな提案も非常に感謝しています。

答えて

1

アプリケーションで最も長い時間を取っている実際のプロファイルはありませんが、ForeachWriterを使用すると、実行ごとにJDBC接続を効果的に閉じて再度開くことが原因であると考えられます。それはForeachWriterの仕組みです。

私はそれを使用する代わりに、接続のオープンまたはクローズの方法を制御するJDBCのカスタムSinkを記述することをお勧めします。

実現可能なアプローチを確認するには、pull request to add a JDBC driver to Sparkがあります。

+1

を使用していますまあ、私はちょうどオラクルリスナーのログ、重い接続を確認し、そこに閉じます。あまりにもスパークが遅くない、オラクルが悪いです、foreach作家だけがこのビジネスニーズのための機能ではありません。フードの下でスパークがrdd(不変)を使用しているので、内部バッファまたは "foreach"データをキャッシュしてバッチでそれらを送信する何かを設定する方法はないと思います。私はcacaulated結果を別のカフカに書き直し、RDBMSに接続するバッチモードの他のツールを使用することを考えています。 –

+0

@dalinqin接続を開いたままにしておくと問題はないので、カスタムの「シンク」を使用するとこの問題が解決されることに注意してください。 –

+0

いくつかのサンプルスクリプトを共有できますか?とにかくstructストリーミングを使用している場合は、foreach作者にjdbcの宛先に書き込むようにしなければなりませんか?私が間違っていれば私を修正してください。 ForeachWriterインターフェイスを実装する必要があります。 –

1

別のカフカトピックに結果を注入して問題を解決し、新しいトピックから読み込んだ別のプログラムをバッチでデータベースに書き込んで書き込んでください。

私は次回のリリースで、jdbcシンクを提供し、バッチサイズを設定するパラメータをいくつか持っていると思います。

書き込みを別のトピックに:

主なコードは以下の通りである

.writeStream.format("kafka") 
    .option("kafka.bootstrap.servers", "x.x.x.x:9092") 
    .option("topic", "fastdbtest") 
    .option("checkpointLocation", "/user/root/chk") 
    .start() 

は、トピックを読み、データベースへの書き込み、私はあなたがあるC3P0接続プールに

lines.foreachRDD(rdd => { 
    if (!rdd.isEmpty) { 
    rdd.foreachPartition(partitionRecords => { 
     //get a connection from connection pool 
     val conn = ConnManager.getManager.getConnection 
     val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)") 
     try { 
     conn.setAutoCommit(false) 
     partitionRecords.foreach(record => { 
      insertIntoDB(ps, record) 
     } 
     ) 
     ps.executeBatch() 
     conn.commit() 
     } catch { 
     case e: Exception =>{} 
     // do some log 
     } finally { 
     ps.close() 
     conn.close() 
     } 
    }) 
    } 
}) 
関連する問題