2017-05-03 10 views
0

トピックのリストで単一のカフカコンシューマを使用して2つのカフカトピックを結合しようとしています。ストリームのjson文字列をさらにPOJOに変換します。その後、keyBy(On event time field)を介してそれらを結合し、単一の太ったjsonとしてそれらをマージするために、私はウィンドウストリームを使用し、ウィンドウストリームにウィンドウ関数を適用することを計画していました。前提条件は、Topic-A & Topic-BをEvent Timeで結合することができ、1つのペア(Topic A(JSON)、Topic B(JSON))が同じeventTimeで存在することを前提としています。 )イベント時刻にkeyBy投稿Apache Flink:AllWindowedストリームのウィンドウ機能 - カフカトピックの結合

を私は同じのための質問のカップルを持っている;。

  1. は、トピックをマージして、単一のJSONを作成するためのアプローチの罰金です
  2. ウィンドウ関数をすべてのウィンドウストリームdoesntのにように見えます?どんな指針も大変ありがとうございます。

コードスニペット:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

logger.info("Flink Stream Window Charger has started"); 

Properties properties = new Properties(); 

properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); 

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka"); 

properties.setProperty("group.id", "group-0011"); 

properties.setProperty("auto.offset.reset", "smallest"); 



List <String> names = new ArrayList < >(); 



names.add("Topic-A"); 

names.add("Topic-B"); 



DataStream <String> stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties)); 

DataStream <TopicPojo> pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); 

List <String> where = new ArrayList <String>(); 

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2); 

DataStream <String> data_charging = data_window.apply(new MyWindowFunction()); 

data_charging.addSink(new SinkFunction <String>() { 



public void invoke(String value) throws Exception { 



    // Yet to be implemented - Merge two POJO into one 

} 

}); 



try 

{ 

env.execute(); 

} catch (Exception e) 

{ 

return; 

} 

} 

} 

class Tokenizer implements FlatMapFunction < TopicPojo, String > { 

private static final long serialVersionUID = 1 L; 

@Override 

public void flatMap(TopicPojo value, Collector <String> out) throws Exception { 

    ObjectMapper mapper = new ObjectMapper(); 

    out.collect(mapper.writeValueAsString(value)); 

} 

} 

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > { 

@Override 

public void apply(String key, GlobalWindow window, Iterable <TopicPojo> arg2, Collector <String> out) 

throws Exception { 

    int count = 0; 

    for (TopicPojo in : arg2) { 

    count++; 

    } 

    // Test Result - TO be modified 

    out.collect("Window: " + window + "count: " + count); 



} 

} 

class Deserializer implements MapFunction < String, TopicPojo > { 

private static final long serialVersionUID = 1 L; 

@Override 

public TopicPojo map(String value) throws IOException { 

    // TODO Auto-generated method stub 

    ObjectMapper mapper = new ObjectMapper(); 

    TopicPojo obj = null; 

    try { 



    System.out.println(value); 



    obj = mapper.readValue(value, TopicPojo.class); 



    } catch (JsonParseException e) { 



    // TODO Auto-generated catch block 



    throw new IOException("Failed to deserialize JSON object."); 



    } catch (JsonMappingException e) { 



    // TODO Auto-generated catch block 



    throw new IOException("Failed to deserialize JSON object."); 

    } catch (IOException e) { 



    // TODO Auto-generated catch block 



    throw new IOException("Failed to deserialize JSON object."); 

    } 

    return obj; 

} 

} 

私は取得しています - 型AllWindowedStreamで

法適用(AllWindowFunction)は引数(MyWindowFunction)エラーには適用されません。

答えて

1

AllWindowedStreamはキーストリームではないため、AllWindowedStreamsのapplyメソッドにはキーパラメータはありません。キー付きストリームをウィンドウ処理しているので、data_windowはKeyedStreamでなければなりません。

関連する問題