2017-08-18 10 views
0

私はScalaを使用しています Sparkストリーミングを試みましたが、15分以上ストリーミングジョブがクラッシュした場合、データ損失が発生します。15分間隔のスパーク実行バッチジョブ

私はちょうどバッチジョブでチェックポイントを手動で保持する方法を知りたいですか?入力データの

ディレクトリは、次の

データのように見えます - > 20170818 - >(タイムスタンプ) - >(多くの.jsonファイル)

データは5分ごとにアップロードされています。

ありがとうございます!

答えて

0

構造化ストリーミングでreadStream機能を使用すると、ディレクトリを監視して新しいファイルを取得できます。 Sparkは自動的にチェックポイントとトラッキングを処理します。ここで

val ds = spark.readStream 
    .format("text") 
    .option("maxFilesPerTrigger", 1) 
    .load(logDirectory) 

は、トピックに関する追加資料についてへのリンクです:https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html

私は個人的に使用されているフォーマット(「テキスト」)しかし、あなたはフォーマット(「JSON」)に変更することができるはず、ここにありますjson形式の詳細:https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

+0

こんにちは、それに分制限を追加する方法を理解できませんか?たとえば、15分ごとにトリガーするにはどうすればいいですか? –

+0

.trigger(ProcessingTime( "10 seconds"))、このオプションはwriteStreamでのみ使用できます。これはおそらく遅延評価によるものです(アクションが実行されると実行が開始される).https://databricks.com/blog/2017 /01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html – sgireddy

関連する問題