2016-11-28 8 views
0

私はSpark-rabbitmq_1.6ライブラリを使用してSpark StreamingからRabbitMQに接続しています。 私が接続しようとしているキューは、X-MAX-長さの制限がある= 1000 私はspark-rabbitmq_1.6のx-max-lengthは機能しません

Map<String, String>rabbitMqConParams = new HashMap<String, String>(); 
rabbitMqConParams.put("hosts", "rabbit.host.com"); 
... 
rabbitMqConParams.put("x-max-length", "1000"); 

JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, String.class, rabbitMqConParams, messageHandler); 

X-maxの長さが設定されているが、それはスロー以下のようにウサギコンフィグのparamsを設定以下のエラー。

16/11/28 15:20:27 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Could not connect 
java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) 
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) 
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer.declareQueue(Consumer.scala:136) 
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer.setQueue(Consumer.scala:110) 
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer.setQueue(Consumer.scala:82) 
    at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$2.apply(RabbitMQInputDStream.scala:64) 
    at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$2.apply(RabbitMQInputDStream.scala:58) 
.... 
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'aeon.output' in vhost '/': received '1000' but current is '1000', class-id=50, method-id=10) 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 

なぜこのようなことが起こる可能性がありますか? 何か助けていただければ幸いです。

ありがとうございました。

答えて

関連する問題