0

私はキネシス・アナリティクスで実験だし、それには多くの問題を解決しましたが、実際に立ち往生している以下:計算デバイス秒

を私は実際にデバイスをオンにしたとき反映し、レコードとストリームを持っていますオフのような:

device_id | timestamp | reading 1 | 2011/09/01 22:30 | 1 1 | 2011/09/01 23:00 | 0 1 | 2011/09/02 03:30 | 1 1 | 2011/09/02 03:31 | 0

  • 私はreadingフィールドにオフのためにのための1と0を使用しています。もし

    device_id | timestamp | reading 1 | 2011/09/01 22:35 | 300 1 | 2011/09/01 22:40 | 300 1 | 2011/09/01 22:45 | 300 1 | 2011/09/01 22:50 | 300 1 | 2011/09/01 22:55 | 300 1 | 2011/09/01 23:00 | 300 1 | 2011/09/01 23:05 | 0 1 | 2011/09/01 23:10 | 0 ...

    わからない:

は私が達成しようとしていることのように見えるデバイスが別のストリームに5分ごとにウィンドウにされている秒数をリダイレクトPUMPを作成することですこれはKinesis Analyticsで達成できるものですが、実際にSQLテーブルをクエリすることはできますが、ストリーミングデータであるという事実に悩まされています。

答えて

1

これはDrools Kinesis Analytics(Amazonでマネージドサービス)で可能です:

タイプ:

package com.text; 

import java.util.Deque; 

declare EventA 
    @role(event) 
    id: int; 
    timestamp: long; 
    on: boolean; 

    //not part of the message 
    seen: boolean; 
end 

declare Session 
    id: int @key; 
    events: Deque; 
end 

declare Report 
    id: int @key; 
    timestamp: long @key; 
    onInLast5Mins: int; 
end 

ルール:

package com.text; 

import java.util.Deque; 
import java.util.ArrayDeque; 

declare enum Constants 

    // 20 seconds - faster to test 
    WINDOW_SIZE(20*1000); 

    value: int; 
end 

rule "Reporter" 
    // 20 seconds - faster to test 
    timer(cron:0/20 * * ? * * *) 
when 
    $s: Session() 
then 
    long now = System.currentTimeMillis(); 

    int on = 0; //how long was on 
    int off = 0; //how long was off 
    int toPersist = 0; //last interesting event 

    for (EventA a : (Deque<EventA>)$s.getEvents()) { 
     toPersist ++; 
     boolean stop = false; 
     // time elapsed since the reading till now 
     int delta = (int)(now - a.getTimestamp()); 
     if (delta >= Constants.WINDOW_SIZE.getValue()) { 
      delta = Constants.WINDOW_SIZE.getValue(); 
      stop = true; 
     } 

     // remove time already counted 
     delta -= (on+off); 
     if (a.isOn()) 
      on += delta; 
     else 
      off += delta; 

     if (stop) 
      break; 
    } 

    int toRemove = $s.getEvents().size() - toPersist; 
    while (toRemove > 0) { 
     // this event is out of window of interest - delete 
     delete($s.getEvents().removeLast()); 
     toRemove --; 
    } 

    insertLogical(new Report($s.getId(), now, on)); 
end 

rule "SessionCreate" 
when 
    // for every new EventA 
    EventA(!seen, $id: id) from entry-point events 
    // check there is no session 
    not (exists(Session(id == $id))) 
then 
    insert(new Session($id, new ArrayDeque())); 
end 

rule "SessionJoin" 
when 
    // for every new EventA 
    $a : EventA(!seen) from entry-point events 
    // get event's session 
    $g: Session(id == $a.id) 
then 
    $g.getEvents().push($a); 
    modify($a) { 
     setSeen(true), 
     setTimestamp(System.currentTimeMillis()) 
    }; 
end 
+0

私はDroolsをチェックしていましたが、かなりクールだと思いますが、SQLを使用してこれを実現したいと考えています。 – codeadict

0

あなたはStride HTTP APIで、この使用してSQLを行うことができます。連続SQL問合せのネットワークを連鎖させ、変更のストリームをサブスクライブしたり、リアルタイムWebhookを起動したりすることができます。詳細については、Stride API docsを参照してください。

関連する問題