2016-07-28 17 views
3

私はflinkでkafkaを使用しています。 シンプルなプログラムでは、Flinks FlinkKafkaConsumer09を使用し、グループIDを割り当てました。flink kafka消費者グループが動作していません

カフカの動作によれば、同じgroup.Idを持つ同じトピックの2つのコンシューマを実行すると、メッセージキューのように動作するはずです。私はそれが次のように動作すると考えています: 2つのメッセージがKafkaに送信された場合、Flinkプログラムのそれぞれが2つのメッセージを完全に2回処理します(合計2行の出力を考えてみましょう)。

しかし実際には、各プログラムは2つのメッセージを受信します。

kafkaサーバーのダウンロードに付属しているコンシューマークライアントを使用しようとしました。それは文書化された方法で働いた(2つのメッセージが処理された)。
私はフリンクプログラマの同じメイン機能で2人のカフカ消費者を使用しようとしました。 4つのメッセージが完全に処理されました。
私はまた、2つのインスタンスのflinkを実行しようとし、それらのそれぞれにkafka consumerの同じプログラムを割り当てました。 4メッセージ。

アイデア? これは私が期待して出力されます:

1> Kafka and Flink2 says: element-65 
2> Kafka and Flink1 says: element-66 

は、ここで私は常に取得間違った出力です:

1> Kafka and Flink2 says: element-65 
1> Kafka and Flink1 says: element-65 
2> Kafka and Flink2 says: element-66 
2> Kafka and Flink1 says: element-66 

そしてここでは、コードのセグメントである:

public static void main(String[] args) throws Exception { 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    ParameterTool parameterTool = ParameterTool.fromArgs(args); 

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); 

    messageStream.rebalance().map(new MapFunction<String, String>() { 
     private static final long serialVersionUID = -6867736771747690202L; 

     @Override 
     public String map(String value) throws Exception { 
      return "Kafka and Flink1 says: " + value; 
     } 
    }).print(); 


    env.execute(); 
} 

私が実行しようとしていますそれは2回、そして逆もまたあります: main関数内にそれぞれ2つのデータストリームとenv.execute()を作成します。

+0

私はまた、2人の消費者がflink-kafka-connectorを使用して、flinkの外側のkafkaクライアントを使用して2人の消費者をflinkインスタンス内で実行してみました。 2人の外の消費者が正しく働いているようです(合計2人)。しかし、他の2つのフリンクは独立して働いているように見えました(外部にも、お互いにも)、それぞれメッセージが2つ、合計で4つです。 – PleaseLetMeGo

答えて

3

今日のFlinkユーザーメーリングリストでは非常によく似た質問がありましたが、ここに投稿するリンクが見つかりません。したがって、答えの一部:

"内部的に、Flink Kafkaコネクタは低レベルのAPI(SimpleConsumer 0.8、およびKafkaConsumer#assign(...)in 0.9を使用しているため、コンシューマグループ管理機能を使用しません。 )を使用して、個々のパーティションの消費量をより細かく制御することができます。つまり、Flink Kafkaコネクタのgroup.id設定は、ZK/Kafkaブローカへのオフセットをコミットするためにのみ使用されます。

多分あなたのために物事を明確にします。

また、あなたに役立つかもしれないFlinkとKafkaと一緒に働いているブログ投稿apoutがあります(http://data-artisans.com/kafka-flink-a-practical-how-to/)。

+0

ありがとう。私は最終的にフリンクコネクタのソースコードを確認します。通常のカフカクライアントよりも別のメッセージ処理経路を使用します。 – PleaseLetMeGo

+0

btwがメーリングリストの質問へのリンクを見つけました:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none – Claudi

0

リンゴカフカ消費者のgroup.idは、飼い猫にオフセットする以外にあまり使われていないので、フリフカフカ消費者が懸念している限り、オフセットモニタリングの方法はありますか?コンシューマー向けの消費者グループ/消費者オフセット・チェッカーの助けを借りて、フリンクカフカの消費者にとっては道があるのがわかりました。

私たちのフリンクカフカ消費者がカフカのトピックのサイズ(特定の時点でのトピック内のメッセージの総数)の後ろに/遅れているのを見たいと思っています。

+0

Isこれは答えか質問ですか? – derekv

関連する問題