私はflinkでkafkaを使用しています。 シンプルなプログラムでは、Flinks FlinkKafkaConsumer09を使用し、グループIDを割り当てました。flink kafka消費者グループが動作していません
カフカの動作によれば、同じgroup.Idを持つ同じトピックの2つのコンシューマを実行すると、メッセージキューのように動作するはずです。私はそれが次のように動作すると考えています: 2つのメッセージがKafkaに送信された場合、Flinkプログラムのそれぞれが2つのメッセージを完全に2回処理します(合計2行の出力を考えてみましょう)。
しかし実際には、各プログラムは2つのメッセージを受信します。
kafkaサーバーのダウンロードに付属しているコンシューマークライアントを使用しようとしました。それは文書化された方法で働いた(2つのメッセージが処理された)。
私はフリンクプログラマの同じメイン機能で2人のカフカ消費者を使用しようとしました。 4つのメッセージが完全に処理されました。
私はまた、2つのインスタンスのflinkを実行しようとし、それらのそれぞれにkafka consumerの同じプログラムを割り当てました。 4メッセージ。
アイデア? これは私が期待して出力されます:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
は、ここで私は常に取得間違った出力です:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
そしてここでは、コードのセグメントである:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute();
}
私が実行しようとしていますそれは2回、そして逆もまたあります: main関数内にそれぞれ2つのデータストリームとenv.execute()を作成します。
私はまた、2人の消費者がflink-kafka-connectorを使用して、flinkの外側のkafkaクライアントを使用して2人の消費者をflinkインスタンス内で実行してみました。 2人の外の消費者が正しく働いているようです(合計2人)。しかし、他の2つのフリンクは独立して働いているように見えました(外部にも、お互いにも)、それぞれメッセージが2つ、合計で4つです。 – PleaseLetMeGo