2016-11-03 11 views
1

Flink DataStreamの各メッセージをどのように反復処理しますか?

DataStream<String> messageStream = env 
    .addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props)); 

がどのように私は、ストリーム内の各メッセージを反復し、それに何かを行うことができますように私はカフカからのメッセージストリームを持っていますか? DataStreamiterate()メソッドがありますが、Iterator<String>は返されません。

答えて

4

私はあなたがMapFunctionを探していると思います。

DataStream<String> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props)); 

DataStream<Y> mappedMessages = messageStream 
    .map(new MapFunction<String, Y>() { 
    public Y map(String message) { 
     // do something with each message and return Y 
    } 
    }); 

あなたは、各受信メッセージに対して正確に一つのレコードを放出したくない場合は、FlatMapFunctionを見ています。

関連する問題