2017-11-13 15 views
3

Spark 2.2のSpark Structuredストリーミングを使用して、HDFSディレクトリからKafkaトピックにストリームファイルを配信しています。私は、このトピックに書いているデータのカフカオフセットをキャプチャしたいと思います。Spark Structured Streamingでの書き込み時のKafkaオフセットのキャプチャ

私はカフカに書き込むには

val write = jsonDF 
.writeStream.format("kafka") 
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint")) 
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer")) 
.option("topic", Config().getString(domain + ".kafkaTopic")) 
.start() 

を使用しています。

私が取得した情報は、オフセットはカフカに作成されると相関しないストリームの進捗情報をキャプチャする

spark.streams.addListener(new StreamingQueryListener() { 
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { 
    println("Query started: " + queryStarted.id) 
    } 
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { 
    println("Query terminated: " + queryTerminated.id) 
    } 
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { 
    println("Query made progress: " + queryProgress.progress) 
    } 
}) 

を利用しています。

ストリームによって提供される情報が、私が利用しているファイルストリームに関するもので、カフカに書かれているものとは関係がないためです。

私たちがKafkaに書き込むときに生成されているオフセット情報をSpark Structure Streamingでキャプチャする方法はありますか?

例を追加: 私はちょうど私が手にトピックを作成した後に3行のソース1からのデータを実行する場合:
ラン1: スタートオフセット:ヌルを、エンドオフセット:{「​​logOffset」:0} スタートオフセット:{"logOffset":0}、終了オフセット:{"logOffset":0}

Kafka Says: 
ruwe:2:1 
ruwe:1:1 
ruwe:0:1 

実行2;

Start Offset: {"logOffset":0}, End offset: {"logOffset":1} 
    Start Offset: {"logOffset":1}, End offset: {"logOffset":1} 

Kafka Says: 
ruwe:2:2 
ruwe:1:2 
ruwe:0:2 

ラン3:私はその後、別のソースから同じプログラムでデータを走った。これは、そのスパークがオンに基づいて情報を報告していることを示し

Start Offset: null, End offset: {"logOffset":0} 
    Start Offset: {"logOffset":0}, End offset: {"logOffset":0} 

    and of course Kafka continued to increment 

を受け

Start Offset: {"logOffset":1}, End offset: {"logOffset":2} 
    Start Offset: {"logOffset":2}, End offset: {"logOffset":2} 

Kafka Says: 
ruwe:2:3 
ruwe:1:3 
ruwe:0:3 

ソース

ターゲットで何が作成されたのか知りたいです。

答えて

1

私たちがKafkaに書き込むときに生成されるオフセット の情報をキャプチャする方法はありますか?

はい、onQueryProgressに、あなたはArray[SourceProgress]あるStreamingQueryProgress.sourcesを見てする必要があります。それはあなたが解析できるJSONsある二つの文字列、startOffsetendOffsetを、持っている:

sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ??? 

    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { 
    val source = event.progress.sources.headOption 
    source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}")) 
    } 

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit =() 
}) 

JSONは以下の構造を有する:スパーク構造ストリーミング用のコード具体的カフカKafkaWriterを読んだ後

"startOffset" : { 
    "topic-name" : { 
    "0" : 1, 
    "1" : 22, 
    "2" : 419, 
    } 
}, 
"endOffset" : { 
    "topic-name" : { 
    "0" : 10, 
    "1" : 100, 
    "2" : 1000 
    } 
} 
+0

これはターゲット(Kafka)ではないソースのバックオフセットを報告しているようです。私は同じソースから実行し続ける場合、私の数字が上がるが、私は2番目のソースからデータを実行するとき、それらが報告されているカフカオフセットではないことを示す最初の数字を報告する。 – SRuwe

+0

@SRuweターゲットのオフセットが必要な場合は、 'SinkProgress.json' –

0

を、KafkaWriteTaskおよびCachedKafkaProducerの場合、SparkはコールバックでKafkaProducerから返されるオフセットを消費しません。彼らが定義するコールバックは例外をキャプチャするだけです。これに基づいて、私は現在のリリース2.2でそれを行うことはできないと言っています。

これらの情報が提供する情報はすべて、ターゲットではなくクエリの発信元です。

関連する問題