私はメッセージがHTTPエンドポイントではなく、JMSキューから来る1つの違いで、この1 How to create a Spring Reactor Flux from a ActiveMQ queue?HTTP統合フローからSpring Reactor Fluxを作成するにはどうすればいいですか?
に非常に似て質問があります。問題は、Message Channelが何らかの理由で実装されていないか、Flux.from()によって取得されていないことです。ログエントリはGenericMessageがパス変数としてペイロードを持つHttp Integrationフローから作成されたものの、チャネルにエンキュー/パブリッシュされないことを示していますか?試しました.channel(MessageChannels.queue())
と.channel(MessageChannels.publishSubscribe())
違いはありません、イベントストリームは空です。ここでは、コードは次のとおりです。
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/eventmessage/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
UPDATE1:
build.gradle
buildscript {
ext {
springBootVersion = '2.0.0.M2'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-freemarker')
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.integration:spring-integration-http')
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
}
アップデート2
それは@SpringBootApplication
と@RestController
は、1つのファイルで定義されているときに動作しますが、ときに動作を停止@SpringBootApplication
と@RestController
は別々のファイルにあります。
TestApp.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApp {
public static void main(String[] args) {
SpringApplication.run(TestApp.class, args);
}
}
TestController.java
package com.example.controller;
import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;
@RestController
public class TestController {
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
}
どのようなSpringブート設定( 'pom')を使用しますか?サーブレットコンテナとWebFluxをどのように組み合わせますか? –
こんにちはArtem、それを調べてくれてありがとう。私は 'SpringBootApplication v。 '2.0.0.M2''のデフォルトでそれを試しています。私は上記の2つのメソッドを持つ '@ RestController'を持っています。そして、それは私のために働くActiveMQのトピックのあなたの答えのように、基本的にはすべて同じです(config-wise)。しかし、JMSキューの代わりに、http REST呼び出しからメッセージを受け取ります。 – Mike
私はちょうど使用されているgradleの依存関係で質問を更新しました。ありがとう! – Mike