2017-09-20 7 views
0

私はパターンを書きました。私は条件(jsonからgettinのルール)のリストを持っています.Data(json)は、フォームのカフカサーバーに来ています。このリストでデータをフィルタリングしたいのですが。しかし、それは動作していません。どうやってやるの? 私はキーストリームとアラームについてはわかりません。このような仕事をすることはできますか?Apache Flinkパターン条件リストあり

メインプログラム:

package cep_kafka_eample.cep_kafka; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.fasterxml.jackson.databind.node.ObjectNode; 
import com.google.gson.Gson; 
import com.google.gson.JsonArray; 
import com.google.gson.JsonParser; 
import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternSelectFunction; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema; 
import util.AlarmPatterns; 
import util.Rules; 
import util.TypeProperties; 

import java.io.FileReader; 
import java.util.*; 

public class MainClass { 

    public static void main(String[] args) throws Exception 
    { 

     ObjectMapper mapper = new ObjectMapper(); 
     JsonParser parser = new JsonParser(); 
     Object obj = parser.parse(new FileReader(
       "c://new 5.json")); 
     JsonArray array = (JsonArray)obj; 
     Gson googleJson = new Gson(); 
     List<Rules> ruleList = new ArrayList<>(); 
     for(int i = 0; i< array.size() ; i++) { 
      Rules jsonObjList = googleJson.fromJson(array.get(i), Rules.class); 
      ruleList.add(jsonObjList); 
     } 

     //apache kafka properties 
     Properties properties = new Properties(); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 

     //starting flink 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.enableCheckpointing(1000).setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     //get kafka values 
     FlinkKafkaConsumer010<ObjectNode> myConsumer = new FlinkKafkaConsumer010<>("demo", new JSONDeserializationSchema(), 
       properties); 
     List<Pattern<ObjectNode,?>> patternList = new ArrayList<>(); 
     DataStream<ObjectNode> dataStream = env.addSource(myConsumer); 
     dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))); 
     DataStream<ObjectNode> keyedStream = dataStream; 
     //get pattern list, keyeddatastream 
     for(Rules rules : ruleList){ 
      List<TypeProperties> typePropertiesList = rules.getTypePropList(); 
      for (int i = 0; i < typePropertiesList.size(); i++) { 
       TypeProperties typeProperty = typePropertiesList.get(i); 
       if (typeProperty.getGroupType() != null && typeProperty.getGroupType().equals("group")) { 
        keyedStream = keyedStream.keyBy(
          jsonNode -> jsonNode.get(typeProperty.getPropName().toString()) 
        ); 
       } 
      } 
      Pattern<ObjectNode,?> pattern = new AlarmPatterns().getAlarmPattern(rules); 
      patternList.add(pattern); 
     } 
     //CEP pattern and alarms 
     List<DataStream<Alert>> alertList = new ArrayList<>(); 
     for(Pattern<ObjectNode,?> pattern : patternList){ 
      PatternStream<ObjectNode> patternStream = CEP.pattern(keyedStream, pattern); 
      DataStream<Alert> alarms = patternStream.select(new PatternSelectFunction<ObjectNode, Alert>() { 
       private static final long serialVersionUID = 1L; 
       public Alert select(Map<String, List<ObjectNode>> map) throws Exception { 
        return new Alert("new message"); 
       } 
      }); 
      alertList.add(alarms); 
     } 
     env.execute("Flink CEP monitoring job"); 
    } 
} 

getAlarmPattern:

package util; 

import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.cep.pattern.conditions.IterativeCondition; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import com.fasterxml.jackson.databind.node.ObjectNode; 

public class AlarmPatterns { 

    public Pattern<ObjectNode, ?> getAlarmPattern(Rules rules) { 

     //MySimpleConditions conditions = new MySimpleConditions(); 
     Pattern<ObjectNode, ?> alarmPattern = Pattern.<ObjectNode>begin("first") 
       .where(new IterativeCondition<ObjectNode>() { 
        @Override 
        public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception { 
         for (Criterias criterias : rules.getCriteriaList()) { 
          if (criterias.getCriteriaType().equals("equals")) { 
           return jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue()); 
          } else if (criterias.getCriteriaType().equals("greaterThen")) { 
           if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) { 
            return false; 
           } 
           int count = 0; 
           for (ObjectNode node : context.getEventsForPattern("first")) { 
            count += node.get("value").asInt(); 
           } 
           return Integer.compare(count, 5) > 0; 
          } else if (criterias.getCriteriaType().equals("lessThen")) { 
           if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) { 
            return false; 
           } 
           int count = 0; 
           for (ObjectNode node : context.getEventsForPattern("first")) { 
            count += node.get("value").asInt(); 
           } 
           return Integer.compare(count, 5) < 0; 
          } 
         } 
         return false; 
        } 
       }).times(rules.getRuleCount()); 
     return alarmPattern; 
    } 
} 
+0

問題を詳細に説明できますか?具体的にはフィルタリングで「機能しない」とは何ですか?あなたは与えられた入力に対してどんな出力を期待していますか? – JBC

+0

私はパターンをデバッグすると、デバッガは関数をフィルタリングしていません。ではない?どう思いますか? – Erdem

+0

パターンをストリームにどのように適用しますか?ストリームの入力要素は何ですか?あなたは本当にありますか? –

答えて

0

FlinkCEPを使ってくれてありがとう!

エラーメッセージ(もしあれば)が正確に何であるかについての詳細を教えてください。これは問題を解決するのに大いに役立ちます。

コードを初めて目から、私は以下の観察することができます:まず

、ライン:

dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));

あなたがこのストリームを使用しないように、実行されることはありませんあなたのプログラムの残りの部分。

第2に、select()の後に取り込まれるシンクを指定する必要があります。 PatternStreamのそれぞれについてprint()メソッドを使用してください。そうしないと、出力が破棄されます。リストは網羅的ではありませんが、例としてはhereのように見えます。

最後に、メモリが不足しないように、within()句をパターンに追加することをお勧めします。

+0

アスター・コスタスに感謝します。私はなぜスライディング処理のウィンドウを使用できないのか理解できませんでしたか?私はプリントを使用し、その中で私は新しい投稿を書く – Erdem

0

エラーは私のjsonオブジェクトからのものです。私はそれを修正します。私がintellijでジョブを実行しているとき、cepは機能しません。 Flinkコンソールから送信すると動作します。

関連する問題