2017-04-21 5 views
0

spring-cloud-stream-binder-ibm-mqを使用して、SpringブートストリームをMQueueで使用する方法を理解しようとしています。私はMQueueに接続できますが、Could not provision topic 'queue///EMB_DEV_QUEUE'MQJE001: Completion Code '2', Reason '2035'を取得します。私はそれがキューでありトピックではないことを管理者に確認しました。IBM MQueueでキューの代わりにトピックを作成しようとするSpring Stream

simplest-sample-applications-using-websphere-mq-jmsに基づいてMQQueueConnectionFactoryを使用していくつかのサンプルコードを使用して接続できるので、私はMQueueが動作していることを知っています。

ここは私のプログラムです。私は成功したカフカのために同じパターンを使いました。

@EnableBinding({Sink.class, Source.class}) 
@SpringBootApplication 
public class MQueueStreamApplication { 

    private final static AtomicInteger counter = new AtomicInteger(); 
    private final  Logger  logger = LoggerFactory.getLogger(getClass()); 

    public static void main(String[] args) { 
     SpringApplication.run(MQueueStreamApplication.class, args); 
    } 

    @Bean 
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000")) 
    public MessageSource<String> timeSource() { 
     return() -> { 
      String message = String.format("Timed Message %d", counter.incrementAndGet()); 

      logger.info("Producing Message: {}", message); 

      return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build(); 
     }; 
    } 

    @ServiceActivator(inputChannel = Sink.INPUT) 
    public void serviceSink(Message<String> message) { 
     String payload = String.valueOf(message.getPayload()); 

     logger.info("Received Message: {} [{}]", payload, message.getHeaders()); 
    } 

} 

ここは私のapplication.ymlです。私はqueue:///という接頭辞を付けて試してみました。サンプルプログラムは接頭辞を使用して動作します。

spring: 
    cloud: 
    stream: 
     bindings: 
     input: 
      destination: queue:///EMB_DEV_QUEUE 
      group: mqueue-stream 
#   binder: ibmmq 
     output: 
      destination: queue:///EMB_DEV_QUEUE 

ibmmq: 
    host: vm-dev-q01.corp.int 
    port: 1414 
    channel: EMB_DEV_CHANNEL 
    queueManager: EMB_DEV_QMGR 

ここは私のGradleビルドです。

buildscript { 
    ext { 
     springBootVersion = '1.5.3.RELEASE' 
    } 
    repositories { 
     mavenCentral() 
    } 
    dependencies { 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") 
    } 
} 

apply plugin: 'java' 
apply plugin: 'eclipse' 
apply plugin: 'idea' 
apply plugin: 'org.springframework.boot' 

version = '0.0.1-SNAPSHOT' 

sourceCompatibility = 1.8 
targetCompatibility = 1.8 

repositories { 
    mavenLocal() 
    mavenCentral() 
} 


dependencies { 
    compile('org.springframework.boot:spring-boot-starter-actuator') 

    compile('org.springframework.cloud:spring-cloud-stream') 
    compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT') 

    testCompile('org.springframework.boot:spring-boot-starter-test') 
} 

dependencyManagement { 
    imports { 
     mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE" 
    } 
} 

私はspring-cloud-stream-binder-ibm-mqを作成しました。私はMQueueインストールから2つの必要なjarファイルを取得しました。マニフェストはバージョン9.0.0.0ですので、私はpom.xml

を入れました。

私はMQueueには新しく、Streamsの経験は限られています。私はカフカにうまくつながりました。助けていただければ幸いです。

ウェールズ。

答えて

0

Spring Cloud Streamは、コンシューマ・グループやパーティション化などの機能を実装するために、通常のJMS/IBM-MQアプリケーションよりも慎重なインフラストラクチャを使用します。この場合、宛先はトピックです。詳細はhttps://github.com/spring-cloud/spring-cloud-stream-binder-ibm-mq#how-it-worksを参照してください。

+0

私は、数回はそれほど確実ではないことを読んだ。おそらくMQでの私の経験の欠如。 IBMのドキュメントでは、トピックは単なるラベルであることを暗示していますが、トピックにアクセスしているように見えるコードをステップ実行し、次にtopic.queueの規則を使用してトピックでキューを表示します。それは構造のように聞こえるか、ちょうどサブスクリプションですか?私の目標は、メッセージを処理できない場合にメッセージを返す消費者を持つことです。最初はClient_Ackだと思っていましたが、今は2つの目的地を考えています。 1つは再試行、もう1つは再試行が失敗した場合です。彼らはトピックでなければなりませんか?遅れて同じキューになるように再試行してください。違う失敗。どうも – Wes

関連する問題