ウォーターマークは、ミニバッファ中に固定値です。あなたの例では、X、Y、Zは同じミニバッチで処理されるため、このレコードに使用される透かしは午前9時20分になります。そのミニバッチの完了後、ウォーターマークは午後7時に更新されます。透かし機能を実装する機能SPARK-18124ためdesign docからの引用以下
:
は、私たちのトリガーベースの実行の低下境界を計算するには、我々は次のことを行う必要があります。
すべてのトリガで
- 、データを集約しながら、トリガが完了した後、我々はまた、トリガデータ
- にイベント時間の最大値をスキャンし、計算透かしトリガーの前に= MAX(イベント時、中の最大イベント時間トリガー) - しきい値
おそらくシミュレーションは、より説明のようになります。
import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)
val schema = StructType(StructField("vilue", StringType) ::
StructField("timestamp", TimestampType) ::
Nil)
val eventStream = spark
.readStream
.option("sep", ";")
.option("header", "false")
.schema(schema)
.csv(dir.toString)
// Watermarked aggregation
val eventsCount = eventStream
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.count
def writeFile(path: Path, data: String) {
val file = fs.create(path)
file.writeUTF(data)
file.close()
}
// Debug query
val query = eventsCount.writeStream
.format("console")
.outputMode("complete")
.option("truncate", "false")
.trigger(ProcessingTime("5 seconds"))
.start()
writeFile(new Path(dir, "file1"), """
|A;2017-08-09 10:00:00
|B;2017-08-09 10:10:00
|C;2017-08-09 10:20:00""".stripMargin)
query.processAllAvailable()
val lp1 = query.lastProgress
// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
// +---------------------------------------------+-----+
// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
// ...
// "numInputRows" : 3,
// "eventTime" : {
// "avg" : "2017-08-09T10:10:00.000Z",
// "max" : "2017-08-09T10:20:00.000Z",
// "min" : "2017-08-09T10:00:00.000Z",
// "watermark" : "1970-01-01T00:00:00.000Z"
// },
// ...
// }
writeFile(new Path(dir, "file2"), """
|Z;2017-08-09 20:00:00
|X;2017-08-09 12:00:00
|Y;2017-08-09 12:50:00""".stripMargin)
query.processAllAvailable()
val lp2 = query.lastProgress
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 |
// +---------------------------------------------+-----+
// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
// ...
// "numInputRows" : 3,
// "eventTime" : {
// "avg" : "2017-08-09T14:56:40.000Z",
// "max" : "2017-08-09T20:00:00.000Z",
// "min" : "2017-08-09T12:00:00.000Z",
// "watermark" : "2017-08-09T09:20:00.000Z"
// },
// "stateOperators" : [ {
// "numRowsTotal" : 3,
// "numRowsUpdated" : 2
// } ],
// ...
// }
writeFile(new Path(dir, "file3"), "")
query.processAllAvailable()
val lp3 = query.lastProgress
// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 |
// +---------------------------------------------+-----+
// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
// ...
// "numInputRows" : 0,
// "eventTime" : {
// "watermark" : "2017-08-09T19:00:00.000Z"
// },
// "stateOperators" : [ ],
// ...
// }
query.stop()
fs.delete(dir, true)
お知らせバッチ0はウォーターマーク1970-01-01 00:00:00
でどのように開始され、バッチ1はウォーターマーク2017-08-09 09:20:00
(最大イベント時間はバッチ0から1時間)で開始されました。バッチ2は、空の状態で、透かしを使用した2017-08-09 19:00:00
です。
私の理解では、Sparkは受信データを透かしフィールドでソートするので、Zは最後に表示されます。 – nonsleepr
これについての参考資料はありますか?私が理解しているところでは、Sparkは異なるKafkaパーティションのデータを並列に読み込みます。そのパーティションは、単一のパーティション内でのみ、データはシリアルで処理されます。 –
上記の私の前提は間違っています:ソートしていた私の(もっと複雑な例の)仕事の実行計画によって誤解されました。 – nonsleepr