val kinesis = spark
.readStream
.format("kinesis")
.option("streams", streamName)
.option("endpointUrl", endpointUrl)
.option("initialPositionInStream", "earliest")
.option("format", "json")
.schema(<my-schema>)
.load
データが一意を持っているいくつかのIoTデバイスから来ているが、次のように私の接続があり、構造化されたストリーミング・フレームワークを使用してキネシスからデータを読み出すためにスパークストリーミングを使用しています次のようにID、私は、このIDによって、およびタイムスタンプフィールドの上にタンブリングウィンドウでデータを集計する必要があります。
val aggregateData = kinesis
.groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
.agg(...)
私が遭遇してる問題は、私はすべてのウィンドウは、ラウンドの時間に開始することを保証する必要があるということです(00:00:00,00:15:00など)、また、フル15分長い窓を含む行だけが私のシンクに出力することとしていることをダ保証は、私が現在やっていることはTHS postgreSQLWriterは、私は、PostgreSQLに各行を挿入するために作成したのStreamWriterである
val query = aggregateData
.writeStream
.foreach(postgreSQLWriter)
.outputMode("update")
.start()
.awaitTermination()
ですSGBD。どのようにして私のウインドウを正確に15分長くすることができ、開始時間は各デバイス固有のIDの丸め15分のタイムスタンプ値になるのですか?