spring-cloud-stream-binder-ibm-mqを使用して、SpringブートストリームをMQueueで使用する方法を理解しようとしています。私はMQueueに接続できますが、Could not provision topic 'queue///EMB_DEV_QUEUE'
とMQJE001: Completion Code '2', Reason '2035'
を取得します。私はそれがキューでありトピックではないことを管理者に確認しました。IBM MQueueでキューの代わりにトピックを作成しようとするSpring Stream
simplest-sample-applications-using-websphere-mq-jmsに基づいてMQQueueConnectionFactory
を使用していくつかのサンプルコードを使用して接続できるので、私はMQueueが動作していることを知っています。
ここは私のプログラムです。私は成功したカフカのために同じパターンを使いました。
@EnableBinding({Sink.class, Source.class})
@SpringBootApplication
public class MQueueStreamApplication {
private final static AtomicInteger counter = new AtomicInteger();
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(MQueueStreamApplication.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000"))
public MessageSource<String> timeSource() {
return() -> {
String message = String.format("Timed Message %d", counter.incrementAndGet());
logger.info("Producing Message: {}", message);
return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build();
};
}
@ServiceActivator(inputChannel = Sink.INPUT)
public void serviceSink(Message<String> message) {
String payload = String.valueOf(message.getPayload());
logger.info("Received Message: {} [{}]", payload, message.getHeaders());
}
}
ここは私のapplication.yml
です。私はqueue:///
という接頭辞を付けて試してみました。サンプルプログラムは接頭辞を使用して動作します。
spring:
cloud:
stream:
bindings:
input:
destination: queue:///EMB_DEV_QUEUE
group: mqueue-stream
# binder: ibmmq
output:
destination: queue:///EMB_DEV_QUEUE
ibmmq:
host: vm-dev-q01.corp.int
port: 1414
channel: EMB_DEV_CHANNEL
queueManager: EMB_DEV_QMGR
ここは私のGradleビルドです。
buildscript {
ext {
springBootVersion = '1.5.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-actuator')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE"
}
}
私はspring-cloud-stream-binder-ibm-mq
を作成しました。私はMQueueインストールから2つの必要なjarファイルを取得しました。マニフェストはバージョン9.0.0.0
ですので、私はpom.xml
に
私はMQueueには新しく、Streamsの経験は限られています。私はカフカにうまくつながりました。助けていただければ幸いです。
ウェールズ。
私は、数回はそれほど確実ではないことを読んだ。おそらくMQでの私の経験の欠如。 IBMのドキュメントでは、トピックは単なるラベルであることを暗示していますが、トピックにアクセスしているように見えるコードをステップ実行し、次にtopic.queueの規則を使用してトピックでキューを表示します。それは構造のように聞こえるか、ちょうどサブスクリプションですか?私の目標は、メッセージを処理できない場合にメッセージを返す消費者を持つことです。最初はClient_Ackだと思っていましたが、今は2つの目的地を考えています。 1つは再試行、もう1つは再試行が失敗した場合です。彼らはトピックでなければなりませんか?遅れて同じキューになるように再試行してください。違う失敗。どうも – Wes