トピックのリストで単一のカフカコンシューマを使用して2つのカフカトピックを結合しようとしています。ストリームのjson文字列をさらにPOJOに変換します。その後、keyBy(On event time field)を介してそれらを結合し、単一の太ったjsonとしてそれらをマージするために、私はウィンドウストリームを使用し、ウィンドウストリームにウィンドウ関数を適用することを計画していました。前提条件は、Topic-A & Topic-BをEvent Timeで結合することができ、1つのペア(Topic A(JSON)、Topic B(JSON))が同じeventTimeで存在することを前提としています。 )イベント時刻にkeyBy投稿Apache Flink:AllWindowedストリームのウィンドウ機能 - カフカトピックの結合
を私は同じのための質問のカップルを持っている;。
- は、トピックをマージして、単一のJSONを作成するためのアプローチの罰金です
- ウィンドウ関数をすべてのウィンドウストリームdoesntのにように見えます?どんな指針も大変ありがとうございます。
コードスニペット:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
logger.info("Flink Stream Window Charger has started");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");
properties.setProperty("group.id", "group-0011");
properties.setProperty("auto.offset.reset", "smallest");
List <String> names = new ArrayList < >();
names.add("Topic-A");
names.add("Topic-B");
DataStream <String> stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));
DataStream <TopicPojo> pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
List <String> where = new ArrayList <String>();
AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);
DataStream <String> data_charging = data_window.apply(new MyWindowFunction());
data_charging.addSink(new SinkFunction <String>() {
public void invoke(String value) throws Exception {
// Yet to be implemented - Merge two POJO into one
}
});
try
{
env.execute();
} catch (Exception e)
{
return;
}
}
}
class Tokenizer implements FlatMapFunction < TopicPojo, String > {
private static final long serialVersionUID = 1 L;
@Override
public void flatMap(TopicPojo value, Collector <String> out) throws Exception {
ObjectMapper mapper = new ObjectMapper();
out.collect(mapper.writeValueAsString(value));
}
}
class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {
@Override
public void apply(String key, GlobalWindow window, Iterable <TopicPojo> arg2, Collector <String> out)
throws Exception {
int count = 0;
for (TopicPojo in : arg2) {
count++;
}
// Test Result - TO be modified
out.collect("Window: " + window + "count: " + count);
}
}
class Deserializer implements MapFunction < String, TopicPojo > {
private static final long serialVersionUID = 1 L;
@Override
public TopicPojo map(String value) throws IOException {
// TODO Auto-generated method stub
ObjectMapper mapper = new ObjectMapper();
TopicPojo obj = null;
try {
System.out.println(value);
obj = mapper.readValue(value, TopicPojo.class);
} catch (JsonParseException e) {
// TODO Auto-generated catch block
throw new IOException("Failed to deserialize JSON object.");
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
throw new IOException("Failed to deserialize JSON object.");
} catch (IOException e) {
// TODO Auto-generated catch block
throw new IOException("Failed to deserialize JSON object.");
}
return obj;
}
}
私は取得しています - 型AllWindowedStreamで
法適用(AllWindowFunction)は引数(MyWindowFunction)エラーには適用されません。