2
私は以下のコードで、各時間ウィンドウためカフカからアイテムの数をカウントにスパーク構造のストリーミングを使用しようとしている:(私は毎分に一度、ことが期待時間ウィンドウごとにアイテムを数えるには?
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.window
object Counter extends App {
val dateFormatter = new SimpleDateFormat("HH:mm:ss")
val spark = ...
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe", ...)
.load()
val windowDuration = "5 minutes"
val counts = df
.select("value").as[Array[Byte]]
.map(decodeTimestampFromKafka).toDF("timestamp")
.select($"timestamp" cast "timestamp")
.withWatermark("timestamp", windowDuration)
.groupBy(window($"timestamp", windowDuration, "1 minute"))
.count()
.as[((Long, Long), Long)]
val writer = new ForeachWriter[((Long, Long), Long)] {
var partitionId: Long = _
var version: Long = _
def open(partitionId: Long, version: Long): Boolean = {
this.partitionId = partitionId
this.version = version
true
}
def process(record: ((Long, Long), Long)): Unit = {
val ((start, end), docs) = record
val startDate = dateFormatter.format(new Date(start))
val endDate = dateFormatter.format(new Date(end))
val now = dateFormatter.format(new Date)
println(s"$now:$this|$partitionId|$version: ($startDate, $endDate) $docs")
}
def close(errorOrNull: Throwable): Unit = {}
}
val query = counts
.repartition(1)
.writeStream
.outputMode("complete")
.foreach(writer)
.start()
query.awaitTermination()
def decodeTimestampFromKafka(bytes: Array[Byte]): Long = ...
}
スライド期間)、(唯一の集約キーはウィンドウであるため)を出力し、最後の5分間に(ウィンドウの継続時間)のアイテム数がとなります。 はしかし、それはこのサンプルのように複数のレコード分あたり2〜3回、出力:
...
22:44:34|[email protected]|0|8: (22:43:20, 22:43:20) 383
22:44:34|[email protected]|0|8: (22:43:18, 22:43:19) 435
22:44:34|[email protected]|0|8: (22:42:33, 22:42:34) 395
22:44:34|[email protected]|0|8: (22:43:14, 22:43:14) 435
22:44:34|[email protected]|0|8: (22:43:09, 22:43:09) 437
22:44:34|[email protected]|0|8: (22:43:19, 22:43:19) 411
22:44:34|[email protected]|0|8: (22:43:07, 22:43:07) 400
22:44:34|[email protected]|0|8: (22:43:17, 22:43:17) 392
22:44:44|[email protected]|0|9: (22:43:37, 22:43:38) 420
22:44:44|[email protected]|0|9: (22:43:25, 22:43:25) 395
22:44:44|[email protected]|0|9: (22:43:22, 22:43:22) 416
22:44:44|[email protected]|0|9: (22:43:00, 22:43:00) 438
22:44:44|[email protected]|0|9: (22:43:41, 22:43:41) 426
22:44:44|[email protected]|0|9: (22:44:13, 22:44:13) 132
22:44:44|[email protected]|0|9: (22:44:02, 22:44:02) 128
22:44:44|[email protected]|0|9: (22:44:09, 22:44:09) 120
...
は、「追加」するために、出力モードを変更する動作を変更するようだが、それでもこれまで私が期待したものから。
私の前提がうまくいかないのですか?上記のコードが与えられた場合、サンプル出力はどのように解釈または使用されるべきですか?