2017-07-11 29 views
1

私はソケットから読み取り、パターンを検出するフリンクcepコードを持っています。パターン(単語)が「警告」であると言います。アラートが5回以上発生する場合は、アラートを作成する必要があります。しかし、入力ミスマッチエラーが発生しています。 Flinkのバージョンは1.3.0です。前もって感謝します !!Flink複雑なイベント処理

package pattern; 

import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.cep.pattern.conditions.IterativeCondition; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.datastream.DataStreamSource; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.util.Collector; 

import java.util.List; 
import java.util.Map; 

    public class cep { 

     public static void main(String[] args) throws Exception { 


      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

       DataStreamSource<String> dss = env.socketTextStream("localhost", 3005); 

       dss.print(); 

       Pattern<String,String> pattern = Pattern.<String> begin("first") 
         .where(new IterativeCondition<String>() { 
          @Override 
          public boolean filter(String word, Context<String> context) throws Exception { 
           return word.equals("alert"); 
          } 
         }) 
         .times(5); 


       PatternStream<String> patternstream = CEP.pattern(dss, pattern); 

       DataStream<String> alerts = patternstream 
         .flatSelect((Map<String,List<String>> in, Collector<String> out) -> { 

          String first = in.get("first").get(0); 

          for (int i = 0; i < 6; i++) { 

           out.collect(first); 

          } 


         }); 

       alerts.print(); 

       env.execute(); 

      } 

    } 

enter image description here

+0

私は動作するコードを持っています。いくつかの変更を加えました。 –

答えて

0

だから私は動作するようにコードを持っています。ここには、実際の解決策があります。

package pattern; 

    import org.apache.flink.cep.CEP; 
    import org.apache.flink.cep.PatternSelectFunction; 
    import org.apache.flink.cep.PatternStream; 
    import org.apache.flink.cep.pattern.Pattern; 
    import org.apache.flink.cep.pattern.conditions.IterativeCondition; 
    import org.apache.flink.streaming.api.datastream.DataStream; 
    import org.apache.flink.streaming.api.datastream.DataStreamSource; 
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.util.Collector; 

    import java.util.List; 
    import java.util.Map; 

    public class cep { 

     public static void main(String[] args) throws Exception { 


      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

       DataStreamSource<String> dss = env.socketTextStream("localhost", 3005); 

       dss.print(); 

       Pattern<String,String> pattern = Pattern.<String> begin("first") 
         .where(new IterativeCondition<String>() { 
          @Override 
          public boolean filter(String word, Context<String> context) throws Exception { 
           return word.equals("alert"); 
          } 
         }) 
         .times(5); 

       PatternStream<String> patternstream = CEP.pattern(dss, pattern); 

       DataStream<String> alerts = patternstream 
         .select(new PatternSelectFunction<String, String>() { 
          @Override 
          public String select(Map<String, List<String>> in) throws Exception { 

           String first = in.get("first").get(0); 

           if(first.equals("alert")){ 

            return ("5 or more alerts"); 
           } 
           else{ 

            return (" "); 
           } 
          } 
         }); 

       alerts.print(); 

       env.execute(); 

      } 

    } 
1

元の問題をちょっと解説します。 1.3.0では、ラムダをselect/flatSelectへの引数として使用できなくなったバグがありました。

1.3.1で修正されているため、コードの最初のバージョンは1.3.1で動作します。


さらに、times量限定記号を間違って解釈したと思います。正確な回数に一致します。したがって、あなたのケースでは、イベントが3回以上ではなく、正確に3回一致する場合にのみ返されます。

+0

ちょっと私は1.3.0で動作するようなコードを持っています。ラムダ式を削除しました。以下の答えを確認してください。とにかくありがとう。それを感謝してください@Dawid Wysakowicz –

+0

はい、私は知っている、私はあなたのコードが働いた理由を明確にしたい。 –

+0

Cool @Dawid。解明してくれてありがとう!私はこのコードを掲示した時から時(5)に時間(3)を変えるのを忘れていました。おっと、私の間違い。今私はそれを編集しました。 –