2017-06-05 4 views
0

私はスパークワールドを初めて使用しています。 タイプ以下の(タプルの形式で)タイムベースDSTREAMを受信気象ストリーミングアプリケーションを有する:Dstreamの現在のバッチ処理中に過去N秒のデータを取得する方法

{ 
    timestamp, 
    { v1, v2, v3} 
} 

V1 = TEMP、V2 =湿度、V3 = windrate

例:{ 01/05/2016 08:01:00, { 25,4.2,10}}

今私の必要条件は、すべての着信レコードの温度をチェックすることです。それが35を超えたら、処理を停止し、最後の10分のデータをhdfsにダンプしてさらに調査します。

+0

ヘルプ/ガイダンスは大変ありがとうございます – rndpavan

答えて

0

目標は、すべてのデータを保存する場合は、最後の10分を受け取ったあなたは35より一時高い得るとき、あなたは、これらの手順を実行する必要があり:

10分のウィンドウ値とのDSTREAMとスライド期間を作成します。たとえば、1秒のdstream.window(分(10)、秒(1))です。

温度が毎秒受信されるかどうかをチェックし、temp> 30の場合、HDFSに保存します(batch_timeから受信したすべてのデータを含む - batch_timeに10分)。

+0

あなたの親切な助けに感謝します。 – rndpavan

+0

私は提案された解決策を試しました。 ちょうどそれが動作しているかどうかを確認するためにサンプルコードを開始しました。ソケットが温度値である場合、私は(k、v)を渡して、30より大きいかどうかを確認します。 – rndpavan

+0

val v1 = input.map {x => )、x(1))} val windowDStream = v1.window(WINDOW_LENGTH、SLIDE_INTERVAL); v1.foreachRDD(rdd => rdd.foreach(x => if(x._2.toInt> 30){ println( "key" + x._1 + "値" + x._2) windowDStream。 saveAsTextFiles( "D:\\ output"、 "txt") })) – rndpavan

関連する問題