FlinkでRabbiMQで高可用性を使用する方法、相関IDと有効化チェックポイントを使用する方法を理解しようとしていますが、機能しません。私のプロデューサーコード:Flink RabbitMQ相関ID
connection = factory.newConnection();
Channel channel = connection.createChannel();
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrId).build();
channel.queueDeclare("flink-poc", true, false, false, null);
MessageQueue queue = new MessageQueue(500); //Queue of messages to be sent to rabbitmq
Message msg = null;
while ((msg = queue.takeMessage()) != null)
channel.basicPublish("", "flink-poc", props, mapper.writeValueAsBytes(msg));
channel.close();
connection.close();
コンシューマコード:このコードで
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setBufferTimeout(100);
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE); // start a checkpoint every 1000 ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
RabbitSource<Message> rabbitSource = new RabbitSource<Message>(Host, 5672, username, pass, "flink-poc", VirtualHost, true, schema);
messages = env.addSource(rabbitSource, TypeInformation.of(Message.class)
は、ACKが全くのRabbitMQに返されません。どんな助けもありがとうございます。
更新:プリフェッチ(channel.basicQos(15);
)を使用すると、動作は開始されますが、速度は非常に遅くなります。それを改善する方法はありますか? setStreamTimeCharacteristic
が無効になっている場合、結果は発注され、レートが10倍改善されますが、それでもなお非常に低いレートです。これはどのように可能ですか?
ありがとうございました。私は、コードとAPIの残りの部分を見て、私がもっと重要な詳細に気付かなかったことにとても執着していました。 – jag