2016-08-30 14 views
2

FlinkでRabbiMQで高可用性を使用する方法、相関IDと有効化チェックポイントを使用する方法を理解しようとしていますが、機能しません。私のプロデューサーコード:Flink RabbitMQ相関ID

connection = factory.newConnection(); 
Channel channel = connection.createChannel(); 

String corrId = java.util.UUID.randomUUID().toString(); 
BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrId).build(); 

channel.queueDeclare("flink-poc", true, false, false, null); 
MessageQueue queue = new MessageQueue(500); //Queue of messages to be sent to rabbitmq 
Message msg = null; 
while ((msg = queue.takeMessage()) != null) 
    channel.basicPublish("", "flink-poc", props, mapper.writeValueAsBytes(msg)); 
channel.close(); 
connection.close(); 

コンシューマコード:このコードで

StreamExecutionEnvironment env = StreamExecutionEnvironment 
      .getExecutionEnvironment(); 
env.setBufferTimeout(100); 
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE); // start a checkpoint every 1000 ms 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

RabbitSource<Message> rabbitSource = new RabbitSource<Message>(Host, 5672, username, pass, "flink-poc", VirtualHost, true, schema); 
messages = env.addSource(rabbitSource, TypeInformation.of(Message.class) 

は、ACKが全くのRabbitMQに返されません。どんな助けもありがとうございます。

更新:プリフェッチ(channel.basicQos(15);)を使用すると、動作は開始されますが、速度は非常に遅くなります。それを改善する方法はありますか? setStreamTimeCharacteristicが無効になっている場合、結果は発注され、レートが10倍改善されますが、それでもなお非常に低いレートです。これはどのように可能ですか?

答えて

1

すべてのメッセージに同じcorrelationIdを使用しているため、すべてのメッセージに新しい相関IDを使用する必要があります。 From https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.html 'usesCorrelationId - 受信したメッセージに重複排除メッセージの一意のIDが指定されているかどうか(確認応答に失敗した場合)。チェックポイント設定が有効な場合にのみ使用されます。

+0

ありがとうございました。私は、コードとAPIの残りの部分を見て、私がもっと重要な詳細に気付かなかったことにとても執着していました。 – jag

関連する問題