私はScalaを使用しています Sparkストリーミングを試みましたが、15分以上ストリーミングジョブがクラッシュした場合、データ損失が発生します。15分間隔のスパーク実行バッチジョブ
私はちょうどバッチジョブでチェックポイントを手動で保持する方法を知りたいですか?入力データの
ディレクトリは、次の
データのように見えます - > 20170818 - >(タイムスタンプ) - >(多くの.jsonファイル)
データは5分ごとにアップロードされています。
ありがとうございます!
私はScalaを使用しています Sparkストリーミングを試みましたが、15分以上ストリーミングジョブがクラッシュした場合、データ損失が発生します。15分間隔のスパーク実行バッチジョブ
私はちょうどバッチジョブでチェックポイントを手動で保持する方法を知りたいですか?入力データの
ディレクトリは、次の
データのように見えます - > 20170818 - >(タイムスタンプ) - >(多くの.jsonファイル)
データは5分ごとにアップロードされています。
ありがとうございます!
構造化ストリーミングでreadStream機能を使用すると、ディレクトリを監視して新しいファイルを取得できます。 Sparkは自動的にチェックポイントとトラッキングを処理します。ここで
val ds = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load(logDirectory)
は、トピックに関する追加資料についてへのリンクです:https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html
私は個人的に使用されているフォーマット(「テキスト」)しかし、あなたはフォーマット(「JSON」)に変更することができるはず、ここにありますjson形式の詳細:https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
こんにちは、それに分制限を追加する方法を理解できませんか?たとえば、15分ごとにトリガーするにはどうすればいいですか? –
.trigger(ProcessingTime( "10 seconds"))、このオプションはwriteStreamでのみ使用できます。これはおそらく遅延評価によるものです(アクションが実行されると実行が開始される).https://databricks.com/blog/2017 /01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html – sgireddy