0
Flink CEPライブラリを使用してHelloとworldが見つかった場合は、文字列を出力しようとしています。私のソースはKafkaで、コンソールプロデューサーを使ってデータを入力しています。その部分は機能しています。私はトピックに入力したものを印刷することができます。しかし、それは私の最終的なメッセージを印刷しません "世界はとてもいいです!"それはラムダに入ったということさえも印刷しません。以下はクラスですFlink CEP No Results Printed
package kafka;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import java.util.Map;
import java.util.Properties;
/**
* Created by crackerman on 9/16/16.
*/
public class WordCount {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "test");
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> src = see.addSource(new FlinkKafkaConsumer08<>("complexString",
new SimpleStringSchema(),
properties));
src.print();
Pattern<String, String> pattern = Pattern.<String>begin("first")
.where(evt -> evt.contains("Hello"))
.followedBy("second")
.where(evt -> evt.contains("World"));
PatternStream<String> patternStream = CEP.pattern(src, pattern);
DataStream<String> alerts = patternStream.flatSelect(
(Map<String, String> in, Collector<String> out) -> {
System.out.println("Made it to the lambda");
String first = in.get("first");
String second = in.get("second");
System.out.println("First: " + first);
System.out.println("Second: " + second);
if (first.equals("Hello") && second.equals("World")) {
out.collect("The world is so nice!");
}
});
alerts.print();
see.execute();
}
}
ご協力いただければ幸いです。
ありがとうございます!
タイムスタンプとウォーターマークを割り当てましたか?イベント時間モードは、各レコードの時間と経過時間をFlinkに通知した場合にのみ機能します。この情報がなければ、Flinkはイベント時にデータを処理することができません。データが不通になった場合や時間ベースの操作(ウィンドウなど)がある場合は、一貫した結果を得るためにイベント時間が必要です。 –
私はタイムスタンプと透かしを割り当てませんでした。私は戻ってこれを実装しました。情報をありがとう! – Crackerman