のために事前にありがとうあなたのキーのタイプ。このコードは、エラーなし(私はダミーのコードで空白を埋めるためにしようとした考慮に入れる)をコンパイルします。
public class Test {
public Test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> events = env.readTextFile("datastream.log");
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class JSONObject {
}
public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
しかし、最も簡単な方法は、あなたがString
タイプを保つことができるように、匿名クラスを使用することです。
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
// Your code here
}
});
クラスを維持したいが、あなたはまた、それがあるように、キーあなたのタイプを保持したい場合は最後に、あなたがKeySelector
を実装することができます
public class Test {
public Test() {
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
@Override
public String getKey(Tuple2<String, JSONObject> json) throws Exception {
return json.f0;
}
})
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
私はこれがdupliだと思います[この質問](https://stackoverflow.com/questions/47033981/probleme-with-apply-function-windowfunction-in-flink)のケイト。答えがあなたの問題を解決する場合は、チェックして終了してください。 –