2017-09-22 2 views
0

Siddhiクエリでは、2つのストリームS1とS2をインポートしています。私がS1ストリームで受け取った場合、私はイベントテーブルT1に挿入し、S2で受け取ると、IDに基づいてT1テーブルを更新し、更新された値をテーブルから出力ストリームO1に送信します。Siddhi - 特定の時間内に更新されないイベントテーブルから取得する

要件の一部として、5分前(レコードが5分以上存在する場合)に挿入されるテーブルT1のコンテンツを取得し、別の出力ストリームO2に送信する必要があります。

@name('S1') 
from S1 
select id, srcId, 'null' as msgId, 'INP' as status 
insert into StatusTable; 

@name('S2') 
from S2#window.time(1min) as g join StatusTable[t.status == 'INP'] as t 
on (g.srcId == t.id) 
select t.id as id, g.msgId as msgId, 'CMP' as status 
update StatusTable on TradeStatusTable.id == id; 

@name('Publish') 
from S2 as g join StatusTable[t.status == 'CMP'] as t on (g.srcId == t.id and t.status == 'CMP') 
select t.id as id, t.msgId as msgId, t.status as status 
insert into O1; 

この既存のクエリにクエリを追加して、5分を超えるレシートのTradeStatusテーブルからレコードをフェッチする方法。テーブルは単独では使用できないので、ストリームに参加する必要があります。このシナリオを実行する方法は?

+0

あなたはstatusTableから5分以上更新されないレコードを希望しますか? – dnWick

+0

はい、私は5分以上別のストリームで送信する必要が更新されていない必要があります。 – gpk

答えて

0
String WebAttackSuccess = "" + 
      "@info(name = 'found_host_charged1') "+ 
      "from ATDEventStream[ rid == 10190001 ]#window.timeBatch(10 sec) as a1 "+ 
      "join ATDEventStream[ rid == 10180004 ]#window.time(10 sec) as a2 on a2.src_ip == a1.src_ip and a2.dst_ip == a1.dst_ip " + 
      " select UUID() as uuid,1007 as cid,a1.sensor_id as sensor_id,a1.interface_id as interface_id,a1.other_id as other_id,count(a1.uuid) as event_num,min(a1.timestamp) as first_seen,max(a2.timestamp) as last_seen,'' as IOC,a1.dst_ip as victim,a1.src_ip as attacker,a1.uuid as NDE4,sample:sample(a2.uuid) as Sample_NDE4 " + 
      " insert into found_host_charged1;"+ 
      ""+ 
      "@info(name = 'found_host_charged2') "+ 
      "from every a1 = found_host_charged1 " + 
      "-> a2 = ATDEventStream[dns_answers != ''] "+ 
      "within 5 min "+ 
      "select UUID() as uuid,1008 as cid,a2.sensor_id as sensor_id,a2.interface_id as interface_id,a2.other_id as other_id,count(a2.uuid) as event_num,a1.first_seen as first_seen,max(a2.timestamp) as last_seen,a2.dns_answers as IOC,a2.dst_ip as victim,a2.src_ip as attacker,a1.uuid as NDE5,sample:sample(a2.uuid) as Sample_NDE5 " + 
      "insert into found_host_charged2; "; 

これは多分あなたはまだ解決されていない2番目のstream.IfにStatusTableからデータを取得することができ、あなたがS1にStatusTableを変更することができ、私は2つのストリームを使用して、私の仕事の一部です。

関連する問題