私は、spark 2.2 struct streamingを使ってkafka msgをoracleデータベースに読み込むプロジェクトに入っています。 kafkaへのメッセージの流れは約4000-6000メッセージ/秒です。spark 2.2 struct streaming foreach writer jdbcシンク・ラグ
hdfsファイルシステムをシンク宛先として使用すると、正常に動作します。 foreach jdbc writerを使用すると、時間の経過とともに大きな遅延が発生します。私は遅れがforeachループによって引き起こされると思います。
JDBCシンククラス(単独で、クラスファイルをスタンド):
class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val driver = "oracle.jdbc.driver.OracleDriver"
var connection: java.sql.Connection = _
var statement: java.sql.PreparedStatement = _
val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"
def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
connection.setAutoCommit(false)
statement = connection.prepareStatement(v_sql)
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
statement.setString(1, value(0).toString)
statement.setString(2, value(1).toString)
statement.setString(3, value(2).toString)
statement.setString(4, value(3).toString)
statement.executeUpdate()
}
def close(errorOrNull: Throwable): Unit = {
connection.commit()
connection.close
}
}
シンク部分:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
.option("subscribe", "rawdb.raw_data")
.option("startingOffsets", "latest")
.load()
.select($"value".as[Array[Byte]])
.map(avroDeserialize(_))
.filter(some logic).select(some logic)
.writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()
私は最後の行を変更した場合
次のようにJDBCのforeachシンクに
.writeStream.format("csv")...
:
val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";
val writer = new JDBCSink(url, user, pwd) .writeStream.foreach(writer).outputMode("append").start()
遅れショーアップ。
foreachループメカニックによって発生する可能性が最も高い問題は、バッチモードではなく、Oracle DBAとしても数千行のように扱うことです。アイドルイベントの場合過剰なコミットは既にconnection.setAutoCommit(false)
に設定することで回避しようとしていますが、どんな提案も非常に感謝しています。
を使用していますまあ、私はちょうどオラクルリスナーのログ、重い接続を確認し、そこに閉じます。あまりにもスパークが遅くない、オラクルが悪いです、foreach作家だけがこのビジネスニーズのための機能ではありません。フードの下でスパークがrdd(不変)を使用しているので、内部バッファまたは "foreach"データをキャッシュしてバッチでそれらを送信する何かを設定する方法はないと思います。私はcacaulated結果を別のカフカに書き直し、RDBMSに接続するバッチモードの他のツールを使用することを考えています。 –
@dalinqin接続を開いたままにしておくと問題はないので、カスタムの「シンク」を使用するとこの問題が解決されることに注意してください。 –
いくつかのサンプルスクリプトを共有できますか?とにかくstructストリーミングを使用している場合は、foreach作者にjdbcの宛先に書き込むようにしなければなりませんか?私が間違っていれば私を修正してください。 ForeachWriterインターフェイスを実装する必要があります。 –