ウィンドウ期間を使用して構造化ストリーミング集約を実行したいと考えています。これに続くデータスキーマを考えると、目的は、ユーザーに基づいて最新のイベントによってフィルタリングすることです。次に、各ロケーションのイベントタイプの数を集計します。スパークストラクチャードストリーミング - 最新の集計での重複排除の方法
time location user type
1 A 1 one
2 A 1 two
1 B 2 one
2 B 2 one
1 A 3 two
1 A 4 one
出力例:
のようなもの次
val aggTypes = df
.select($"location", $"time", $"user", $"type")
.groupBy($"user")
.agg(max($"timestamp") as 'timestamp)
.select("*")
.withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
.agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
.drop($"window")
ように構成されたストリーミングは、ストリーミングデータフレームでサポートされていない複数の集計と非時間ベースのウィンドウをサポートしていません。 /データセット。私はそれが1つのストリーミングクエリで所望の出力を達成することが可能かどうかはわかりません。
何か助けていただければ幸いです。