2017-08-08 1 views
3

プログラミングガイドでは、ストラクチャードストリーミングは、適切なソース/シンクを使用して正確に1回のセマンティクスをエンドツーエンドで保証します。ストラクチャードストリーミング:透かし対正確に1回のセマンティクス

しかし、ジョブがクラッシュしてウォーターマークが適用されたときの仕組みを理解できません。

以下は私が現在どのように動作しているかの例です。私が誤解しているところで私を修正してください。前もって感謝します!

例:

スパーク求人:1時間の透かしと、各1時間の枠で#イベントをカウントします。

メッセージ:

  • A - タイムスタンプ10:00
  • B - タイムスタンプ10:10
  • C - タイムスタンプ10:20
  • X - タイムスタンプ正午
  • Y - タイムスタンプ12:50
  • Zタイムスタンプ8pm

私たちはジョブを開始し、ソースからA、B、Cを読み込み、私たちがシンクに書き込む前の10時30分にジョブがクラッシュします。

午後6時にジョブが復旧し、保存されたチェックポイント/ WALを使用してA、B、Cを再処理することがわかります。最終的なカウントは、10-11amのウィンドウでは3です。

次に、Kafka、X、Y、Zからの新しいメッセージは、異なるパーティションに属しているため、並行して読み込みます。 Zが最初に処理されるので、maxイベントのタイムスタンプは8pmに設定されます。ジョブでXとYが読み取られると、ウォーターマーク(8pm - 1時間= 7pm)の後ろにあるため、古いデータとして破棄されます。最終的なカウントは、午後8時〜午後9時には1であり、ジョブは12-1pmウィンドウについては何も報告しません。私たちは、

は、このシナリオでは正確です--- XとY

---エンド例えばデータを失ってしまいましたか? もしそうなら、kafka-> sparkから正常に流しているときに、遅れ/順不同のデータを処理するのに1時間のウォーターマークが充分であるかもしれませんが、スパークジョブがダウン/カフカ接続が長時間失われたときではありません。データ損失を避ける唯一の選択肢は、仕事が絶望すると予想するよりも長い透かしを使用することでしょうか?

+0

私の理解では、Sparkは受信データを透かしフィールドでソートするので、Zは最後に表示されます。 – nonsleepr

+0

これについての参考資料はありますか?私が理解しているところでは、Sparkは異なるKafkaパーティションのデータを並列に読み込みます。そのパーティションは、単一のパーティション内でのみ、データはシリアルで処理されます。 –

+0

上記の私の前提は間違っています:ソートしていた私の(もっと複雑な例の)仕事の実行計画によって誤解されました。 – nonsleepr

答えて

3

ウォーターマークは、ミニバッファ中に固定値です。あなたの例では、X、Y、Zは同じミニバッチで処理されるため、このレコードに使用される透かしは午前9時20分になります。そのミニバッチの完了後、ウォーターマークは午後7時に更新されます。透かし機能を実装する機能SPARK-18124ためdesign docからの引用以下

は、私たちのトリガーベースの実行の低下境界を計算するには、我々は次のことを行う必要があります。

すべてのトリガで
  • 、データを集約しながら、トリガが完了した後、我々はまた、トリガデータ
  • にイベント時間の最大値をスキャンし、計算透かしトリガーの前に= MAX(イベント時、中の最大イベント時間トリガー) - しきい値

おそらくシミュレーションは、より説明のようになります。

import org.apache.hadoop.fs.Path 
import java.sql.Timestamp 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.streaming.ProcessingTime 

val dir = new Path("/tmp/test-structured-streaming") 
val fs = dir.getFileSystem(sc.hadoopConfiguration) 
fs.mkdirs(dir) 

val schema = StructType(StructField("vilue", StringType) :: 
         StructField("timestamp", TimestampType) :: 
         Nil) 

val eventStream = spark 
    .readStream 
    .option("sep", ";") 
    .option("header", "false") 
    .schema(schema) 
    .csv(dir.toString) 

// Watermarked aggregation 
val eventsCount = eventStream 
    .withWatermark("timestamp", "1 hour") 
    .groupBy(window($"timestamp", "1 hour")) 
    .count 

def writeFile(path: Path, data: String) { 
    val file = fs.create(path) 
    file.writeUTF(data) 
    file.close() 
} 

// Debug query 
val query = eventsCount.writeStream 
    .format("console") 
    .outputMode("complete") 
    .option("truncate", "false") 
    .trigger(ProcessingTime("5 seconds")) 
    .start() 

writeFile(new Path(dir, "file1"), """ 
    |A;2017-08-09 10:00:00 
    |B;2017-08-09 10:10:00 
    |C;2017-08-09 10:20:00""".stripMargin) 

query.processAllAvailable() 
val lp1 = query.lastProgress 

// ------------------------------------------- 
// Batch: 0 
// ------------------------------------------- 
// +---------------------------------------------+-----+ 
// |window          |count| 
// +---------------------------------------------+-----+ 
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 | 
// +---------------------------------------------+-----+ 

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress = 
// { 
// ... 
// "numInputRows" : 3, 
// "eventTime" : { 
//  "avg" : "2017-08-09T10:10:00.000Z", 
//  "max" : "2017-08-09T10:20:00.000Z", 
//  "min" : "2017-08-09T10:00:00.000Z", 
//  "watermark" : "1970-01-01T00:00:00.000Z" 
// }, 
// ... 
// } 


writeFile(new Path(dir, "file2"), """ 
    |Z;2017-08-09 20:00:00 
    |X;2017-08-09 12:00:00 
    |Y;2017-08-09 12:50:00""".stripMargin) 

query.processAllAvailable() 
val lp2 = query.lastProgress 

// ------------------------------------------- 
// Batch: 1 
// ------------------------------------------- 
// +---------------------------------------------+-----+ 
// |window          |count| 
// +---------------------------------------------+-----+ 
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 | 
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 | 
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 | 
// +---------------------------------------------+-----+ 

// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress = 
// { 
// ... 
// "numInputRows" : 3, 
// "eventTime" : { 
//  "avg" : "2017-08-09T14:56:40.000Z", 
//  "max" : "2017-08-09T20:00:00.000Z", 
//  "min" : "2017-08-09T12:00:00.000Z", 
//  "watermark" : "2017-08-09T09:20:00.000Z" 
// }, 
// "stateOperators" : [ { 
//  "numRowsTotal" : 3, 
//  "numRowsUpdated" : 2 
// } ], 
// ... 
// } 

writeFile(new Path(dir, "file3"), "") 

query.processAllAvailable() 
val lp3 = query.lastProgress 

// ------------------------------------------- 
// Batch: 2 
// ------------------------------------------- 
// +---------------------------------------------+-----+ 
// |window          |count| 
// +---------------------------------------------+-----+ 
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 | 
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 | 
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 | 
// +---------------------------------------------+-----+ 

// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress = 
// { 
// ... 
// "numInputRows" : 0, 
// "eventTime" : { 
//  "watermark" : "2017-08-09T19:00:00.000Z" 
// }, 
// "stateOperators" : [ ], 
// ... 
// } 

query.stop() 
fs.delete(dir, true) 

お知らせバッチ0はウォーターマーク1970-01-01 00:00:00でどのように開始され、バッチ1はウォーターマーク2017-08-09 09:20:00(最大イベント時間はバッチ0から1時間)で開始されました。バッチ2は、空の状態で、透かしを使用した2017-08-09 19:00:00です。

+0

詳細な回答をいただきありがとうございました。 –

1

Zが最初に処理されるため、最大イベントタイムスタンプは8pmに設定されます。

これは正しいです。 Zが最初に計算されても、透かしは、現在のクエリ反復における最大タイムスタンプから減算される。つまり、08:00 PMは透かし時間を差し引く時間として設定されます。つまり、12:00と12:50は破棄されます。

From the documentation

特定ウィンドウの時間Tで開始する、エンジン状態を維持し、遅いデータ(エンジンによって見られる最大イベント時間 - 遅く閾値> T)まで状態を更新することを可能にする


は、データの損失を回避するための唯一のオプションは、あなたがこれまで

のためにダウンするジョブの予想よりも長い透かしを使用することです

必ずしもそうではありません。 Kafkaのクエリごとに読み取るデータの最大量を100項目に設定すると仮定します。小規模なバッチを読んで、各パーティションから連続して読んでいる場合、各バッチの最大タイムスタンプはブローカの最新のメッセージの最大時間ではない可能性があります。つまり、これらのメッセージは失われません。

+0

申し訳ありません最初の点を理解しているかどうかはわかりません。 'withWatermark' [documentation](https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/Dataset.html#withWatermark-java.lang.String-java)を参照してください。 lang.String-)は、最大イベント時間が現在のクエリのすべてのパーティションにわたって設定されていることを示します。その場合、透かしを引く時間は20:00ではないはずですから、12:50と12:00は破棄されますか? 2番目の点では、それは本当ですが、スロットルが問題を緩和するのに役立ちます。それぞれのバッチがすべて透かしに収まるかどうかはまだ分かりませんが、やはりギャンブルのようです。 –

+0

@RayJウォーターマークは、ミニバッチ中は固定値です。あなたの例では、X、Y、Zは同じminibatchで処理されるため、それらの透かしは午前9時20分になります。そのミニバッチの完了後、ウォーターマークは午後7時に更新されます。 – nonsleepr

+0

@RayJ最大イベント時間は午後8時ではなく午後12時10分です。なぜ最大値でない値からそれを減算するのでしょうか? –

関連する問題