2017-09-12 9 views
1

同じ識別子を持つ2つのイベントに基づいて、定義された時間枠内に2つのイベントが発生したかどうかを検出したいと考えています。たとえば、DoorEvent次のようになります。例ではステートフル複雑なイベント処理とapache flink

<doorevent> 
    <door> 
    <id>1</id> 
    <status>open</status> 
    </door> 
    <timestamp>12345679</timestamp> 
</doorevent> 

<doorevent> 
    <door> 
    <id>1</id> 
    <status>close</status> 
    </door> 
    <timestamp>23456790</timestamp> 
</doorevent> 

マイDoorEvent Javaクラスは、以下の同様の構造を有しています。

開けて5分以内にid1のドアが閉じていることを検出したいと思います。私は、この目的のためにApache flink CEPライブラリを使用しようとしています。入ってくるストリームには、20個のドアからのすべての開いたメッセージと閉じたメッセージが含まれています。私はそのdoor_closeステップで、私はドア1が閉鎖されている一つであり、それはいくつかの他のドアではないことを知っているdoor_openのように開いたドア1の状態を保存するにはどうすればよい

Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
    new SimpleCondition<String>() { 
     private static final long serialVersionUID = 1L; 
     public boolean filter(String doorevent) { 
      DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML); 
      if (event.getDoor().getStatus().equals("open")){ 
       // save state of door as open 
       return true; 
      } 
      return false;       
     } 
    } 
) 
.followedByAny("door_close").where(
    new SimpleCondition<String>() { 
      private static final long serialVersionUID = 1L; 
      public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException { 
       DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML); 
       if (event.getDoor().getStatus().equals("close")){ 
        // check if close is of previously opened door 
        return true; 
       } 
       return false; 
      } 
     } 
) 
.within(Time.minutes(5)); 

答えて

2

あなたはFLINK 1.3.0持っており、その本当にあなたが

あなたのパターンは、このようなものになります何をしたいのかstraightforard上記の場合:だから、基本的にあなたがIterativeConditionsを使用すると、コンテキストを取得することができます

Pattern.<DoorEvent>begin("first") 
     .where(new SimpleCondition<DoorEvent>() { 
      private static final long serialVersionUID = 1390448281048961616L; 

      @Override 
      public boolean filter(DoorEvent event) throws Exception { 
      return event.getDoor().getStatus().equals("open"); 
      } 
     }) 
     .followedBy("second") 
     .where(new IterativeCondition<DoorEvent>() { 
      private static final long serialVersionUID = -9216505110246259082L; 

      @Override 
      public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception { 

      if (!secondEvent.getDoor().getStatus().equals("close")) { 
       return false; 
      } 

      for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) { 
       if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) { 
       return true; 
       } 
      } 
      return false; 
      } 
     }) 
     .within(Time.minutes(5)); 

を照合された最初のパターンに対して、必要なものと比較しながらそのリストを反復し、必要に応じて処理を進めます。

IterativeConditionsは高価であり、条件の詳細については応じ

扱うべきFlink - Conditions

でこちらをご確認ください
関連する問題