0
私はSpark-rabbitmq_1.6ライブラリを使用してSpark StreamingからRabbitMQに接続しています。 私が接続しようとしているキューは、X-MAX-長さの制限がある= 1000 私はspark-rabbitmq_1.6のx-max-lengthは機能しません
Map<String, String>rabbitMqConParams = new HashMap<String, String>();
rabbitMqConParams.put("hosts", "rabbit.host.com");
...
rabbitMqConParams.put("x-max-length", "1000");
JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, String.class, rabbitMqConParams, messageHandler);
X-maxの長さが設定されているが、それはスロー以下のようにウサギコンフィグのparamsを設定以下のエラー。
16/11/28 15:20:27 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Could not connect
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.spark.streaming.rabbitmq.consumer.Consumer.declareQueue(Consumer.scala:136)
at org.apache.spark.streaming.rabbitmq.consumer.Consumer.setQueue(Consumer.scala:110)
at org.apache.spark.streaming.rabbitmq.consumer.Consumer.setQueue(Consumer.scala:82)
at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$2.apply(RabbitMQInputDStream.scala:64)
at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$2.apply(RabbitMQInputDStream.scala:58)
....
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'aeon.output' in vhost '/': received '1000' but current is '1000', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
なぜこのようなことが起こる可能性がありますか? 何か助けていただければ幸いです。
ありがとうございました。