2017-10-06 9 views
1

私はカフカから構造化ストリーミングをしようとしています。 HDFSにチェックポイントを格納する予定です。私はSparkストリーミングのためにHDFSにチェックポイントを保存しないことを推奨するclouderaブログを読んでいます。構造体ストリーミングチェックポイントでも同じ問題です。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/カフカストラクチャードストリーミングチェックポイント

スパークプログラムが一定時間停止している場合、チェックポイントディレクトリから最新のオフセットを取得し、そのオフセット後にデータをロードするにはどうすればよいですか。 以下に示すように、ディレクトリにチェックポイントを格納しています。

df.writeStream\ 
     .format("text")\ 
     .option("path", '\files') \ 
     .option("checkpointLocation", 'checkpoints\chkpt') \ 
     .start() 

更新:

これは私の構造化ストリーミングプログラムは、カフカのメッセージを読み取り解凍し、HDFSへの書き込みです。クエリで

df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", KafkaServer) \ 
     .option("subscribe", KafkaTopics) \ 
     .option("failOnDataLoss", "false")\ 
     .load() 
Transaction_DF = df.selectExpr("CAST(value AS STRING)") 
Transaction_DF.printSchema() 

decomp = Transaction_DF.select(zip_extract("value").alias("decompress")) 
#zip_extract is a UDF to decompress the stream 

query = decomp.writeStream\ 
    .format("text")\ 
    .option("path", \Data_directory_inHDFS) \ 
    .option("checkpointLocation", \pathinDHFS\) \ 
    .start() 

query.awaitTermination() 
+0

は、あなたのブログは、あなたがHDFS上のチェックポイントを保存しないことをお勧めしますことを確認していますか?それはかなり奇妙です。リンクがありますか?ストラクチャードストリーミングの質問では、同じチェックポイントディレクトリを使用して同じコードを実行するだけで、構造化されたストリーミングが最後の失敗のオフセットを取得してから再開します。 – zsxwing

+0

@zsxwingこれはclouderaのブログリンクですhttps://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/私はストリーミングプログラムを手動で殺しました。もう一度起動し、受信したメッセージが処理された後にのみ処理を開始します。それは停止したメッセージを無視して再度処理しませんでした –

+0

ドライバログを見て、 'logInfo(s" GetBatch start = $ start、end = $ end ")'で出力されたログを探してください。それはクエリが処理したものを教えてくれるはずです。 – zsxwing

答えて

0

、寄木細工のようないくつかの形式でHDFSのようないくつかの永続ストレージに結果を書き込み中にチェックポイントを適用してみてください。それは私のためにうまくいった。

あなたのコードを共有して、さらに詳細を見ることができますか?

+0

質問の更新として完全なコードを追加しました。チェックポイントファイルからどのように最新のオフセットを取得しましたか? –

+0

これを実行している間に、HDFSのチェックポイントが時間の経過と共にますます多くのストレージを使用しないようにするにはどうすればよいですか?それを管理するために使用できる「クリーンアップ」設定はありますか? –

2

長期保存(HDFS、AWS S3など)でのチェックポイントの保存が最も適しています。私はプロパティ "failOnDataLoss"は、ベストプラクティスではないので、falseに設定してはならないという点をここに追加したいと思います。データの損失は、誰も望みたくないものです。あなたは正しい道にいます。

+0

これを実行している間に、HDFSのチェックポイントが時間の経過と共にますます多くのストレージを使用しないようにするにはどうすればよいですか?それを管理するために使用できる「クリーンアップ」設定はありますか? –

+0

私が知る限り、チェックポイントは多くのデータを保存しません。カフカのようにオフセットを保存するので、ストレージの問題を心配する必要はありません。チェックポイントをクリアする場合は、メンテナンス中に行うか、そのためのスケジューラ。 –

+0

SparkConfで "spark.cleaner.referenceTracking.cleanCheckpoints"、 "true"を使用しています。クリーンチェックポイントで動作します。 –

0

人工的には、Hbase、Kafka、HDFS、またはZookeeperのいずれかのオフセット管理を維持することをお勧めしました。あなたがまた、HDFSなどのストレージ システム内のオフセットを格納できることを言及する価値がある。HDFSにオフセットを格納すると、上記のオプションに比べてあまり人気のアプローチ であるHDFSは他の に比べてより長い待ち時間を持っているよう

」 ZooKeeperやHBaseのようなシステムです。 "

あなたはどのように既存のチェックポイントからのクエリを再起動するスパークのドキュメントで見つけることができます:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing