Spark 2.2のSpark Structuredストリーミングを使用して、HDFSディレクトリからKafkaトピックにストリームファイルを配信しています。私は、このトピックに書いているデータのカフカオフセットをキャプチャしたいと思います。Spark Structured Streamingでの書き込み時のKafkaオフセットのキャプチャ
私はカフカに書き込むには
val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()
を使用しています。
私が取得した情報は、オフセットはカフカに作成されると相関しないストリームの進捗情報をキャプチャする
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
を利用しています。
ストリームによって提供される情報が、私が利用しているファイルストリームに関するもので、カフカに書かれているものとは関係がないためです。
私たちがKafkaに書き込むときに生成されているオフセット情報をSpark Structure Streamingでキャプチャする方法はありますか?
例を追加: 私はちょうど私が手にトピックを作成した後に3行のソース1からのデータを実行する場合:
ラン1: スタートオフセット:ヌルを、エンドオフセット:{「logOffset」:0} スタートオフセット:{"logOffset":0}、終了オフセット:{"logOffset":0}
Kafka Says:
ruwe:2:1
ruwe:1:1
ruwe:0:1
実行2;
Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
Start Offset: {"logOffset":1}, End offset: {"logOffset":1}
Kafka Says:
ruwe:2:2
ruwe:1:2
ruwe:0:2
ラン3:私はその後、別のソースから同じプログラムでデータを走った。これは、そのスパークがオンに基づいて情報を報告していることを示し
Start Offset: null, End offset: {"logOffset":0}
Start Offset: {"logOffset":0}, End offset: {"logOffset":0}
and of course Kafka continued to increment
を受け
Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
Start Offset: {"logOffset":2}, End offset: {"logOffset":2}
Kafka Says:
ruwe:2:3
ruwe:1:3
ruwe:0:3
ソース
ターゲットで何が作成されたのか知りたいです。
これはターゲット(Kafka)ではないソースのバックオフセットを報告しているようです。私は同じソースから実行し続ける場合、私の数字が上がるが、私は2番目のソースからデータを実行するとき、それらが報告されているカフカオフセットではないことを示す最初の数字を報告する。 – SRuwe
@SRuweターゲットのオフセットが必要な場合は、 'SinkProgress.json' –