私はカフカから構造化ストリーミングをしようとしています。 HDFSにチェックポイントを格納する予定です。私はSparkストリーミングのためにHDFSにチェックポイントを保存しないことを推奨するclouderaブログを読んでいます。構造体ストリーミングチェックポイントでも同じ問題です。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/。カフカストラクチャードストリーミングチェックポイント
スパークプログラムが一定時間停止している場合、チェックポイントディレクトリから最新のオフセットを取得し、そのオフセット後にデータをロードするにはどうすればよいですか。 以下に示すように、ディレクトリにチェックポイントを格納しています。
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
更新:
これは私の構造化ストリーミングプログラムは、カフカのメッセージを読み取り解凍し、HDFSへの書き込みです。クエリで
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
は、あなたのブログは、あなたがHDFS上のチェックポイントを保存しないことをお勧めしますことを確認していますか?それはかなり奇妙です。リンクがありますか?ストラクチャードストリーミングの質問では、同じチェックポイントディレクトリを使用して同じコードを実行するだけで、構造化されたストリーミングが最後の失敗のオフセットを取得してから再開します。 – zsxwing
@zsxwingこれはclouderaのブログリンクですhttps://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/私はストリーミングプログラムを手動で殺しました。もう一度起動し、受信したメッセージが処理された後にのみ処理を開始します。それは停止したメッセージを無視して再度処理しませんでした –
ドライバログを見て、 'logInfo(s" GetBatch start = $ start、end = $ end ")'で出力されたログを探してください。それはクエリが処理したものを教えてくれるはずです。 – zsxwing