2017-09-20 5 views
4

私はスパークストリーミング保証特定の開始ウィンドウの時間

val kinesis = spark 
    .readStream 
    .format("kinesis") 
    .option("streams", streamName) 
    .option("endpointUrl", endpointUrl) 
    .option("initialPositionInStream", "earliest") 
    .option("format", "json") 
    .schema(<my-schema>) 
    .load 

データが一意を持っているいくつかのIoTデバイスから来ているが、次のように私の接続があり、構造化されたストリーミング・フレームワークを使用してキネシスからデータを読み出すためにスパークストリーミングを使用しています次のようにID、私は、このIDによって、およびタイムスタンプフィールドの上にタンブリングウィンドウでデータを集計する必要があります。

val aggregateData = kinesis 
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute")) 
    .agg(...) 

私が遭遇してる問題は、私はすべてのウィンドウは、ラウンドの時間に開始することを保証する必要があるということです(00:00:00,00:15:00など)、また、フル15分長い窓を含む行だけが私のシンクに出力することとしていることをダ保証は、私が現在やっていることはTHS postgreSQLWriterは、私は、PostgreSQLに各行を挿入するために作成したのStreamWriterである

val query = aggregateData 
    .writeStream 
     .foreach(postgreSQLWriter) 
     .outputMode("update") 
     .start() 
     .awaitTermination() 

ですSGBD。どのようにして私のウインドウを正確に15分長くすることができ、開始時間は各デバイス固有のIDの丸め15分のタイムスタンプ値になるのですか?

答えて

1

質問1: 開始するのに特定の時間に開始するには、「オフセット」というスパークグループ化関数が使用するパラメータがもう1つあります。 COLUMN1によって

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")) 

ので、上記の構文意志基と1分のウィンドウサイズを摺動して22分の持続時間のウィンドウを作成し、15のようにオフセット:それは時間 例から指定された時間後に開始することを指定することにより これはカウント列を作成し、 完全な15分の大きさのウィンドウだけをプッシュする:

window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes) 
window2: 8:16(previous window start + 1 minute) to 8:38 (22 minute size again) 

question2:分

は、例えばそれはから始まりますそのウィンドウ内にあるイベントの数を数えます。それが15に達すると、あなたは、カウント計算filterコマンド

を使って好きな場所にそれをプッシュする:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count")) 

writestreamフィルター含む数15のみ:

aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start() 
関連する問題