2016-09-19 6 views
0

Flink CEPライブラリを使用してHelloとworldが見つかった場合は、文字列を出力しようとしています。私のソースはKafkaで、コンソールプロデューサーを使ってデータを入力しています。その部分は機能しています。私はトピックに入力したものを印刷することができます。しかし、それは私の最終的なメッセージを印刷しません "世界はとてもいいです!"それはラムダに入ったということさえも印刷しません。以下はクラスですFlink CEP No Results Printed

package kafka; 

import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 
import org.apache.flink.util.Collector; 

import java.util.Map; 
import java.util.Properties; 

/** 
* Created by crackerman on 9/16/16. 
*/ 
public class WordCount { 

public static void main(String[] args) throws Exception { 

    Properties properties = new Properties(); 
    properties.put("bootstrap.servers", "localhost:9092"); 
    properties.put("zookeeper.connect", "localhost:2181"); 
    properties.put("group.id", "test"); 
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
    see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

    DataStream<String> src = see.addSource(new FlinkKafkaConsumer08<>("complexString", 
                     new SimpleStringSchema(), 
                     properties)); 

    src.print(); 


    Pattern<String, String> pattern = Pattern.<String>begin("first") 
      .where(evt -> evt.contains("Hello")) 
      .followedBy("second") 
      .where(evt -> evt.contains("World")); 

    PatternStream<String> patternStream = CEP.pattern(src, pattern); 

    DataStream<String> alerts = patternStream.flatSelect(
      (Map<String, String> in, Collector<String> out) -> { 
       System.out.println("Made it to the lambda"); 
       String first = in.get("first"); 
       String second = in.get("second"); 
       System.out.println("First: " + first); 
       System.out.println("Second: " + second); 

       if (first.equals("Hello") && second.equals("World")) { 

        out.collect("The world is so nice!"); 
       } 


      }); 

    alerts.print(); 

    see.execute(); 
} 

} 

ご協力いただければ幸いです。

ありがとうございます!

答えて

0

問題は、次の行

see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

それが削除された場合、それは私はそれが期待されるように動作しています。

+1

タイムスタンプとウォーターマークを割り当てましたか?イベント時間モードは、各レコードの時間と経過時間をFlinkに通知した場合にのみ機能します。この情報がなければ、Flinkはイベント時にデータを処理することができません。データが不通になった場合や時間ベースの操作(ウィンドウなど)がある場合は、一貫した結果を得るためにイベント時間が必要です。 –

+1

私はタイムスタンプと透かしを割り当てませんでした。私は戻ってこれを実装しました。情報をありがとう! – Crackerman

関連する問題