2

私はメッセージがHTTPエンドポイントではなく、JMSキューから来る1つの違いで、この1 How to create a Spring Reactor Flux from a ActiveMQ queue?HTTP統合フローからSpring Reactor Fluxを作成するにはどうすればいいですか?

に非常に似て質問があります。問題は、Message Channelが何らかの理由で実装されていないか、Flux.from()によって取得されていないことです。ログエントリはGenericMessageがパス変数としてペイロードを持つHttp Integrationフローから作成されたものの、チャネルにエンキュー/パブリッシュされないことを示していますか?試しました.channel(MessageChannels.queue()).channel(MessageChannels.publishSubscribe()) 違いはありません、イベントストリームは空です。ここでは、コードは次のとおりです。

@Bean 
public Publisher<Message<String>> httpReactiveSource() { 
     return IntegrationFlows. 
       from(Http.inboundChannelAdapter("/eventmessage/{id}") 
         .requestMapping(r -> r 
         .methods(HttpMethod.POST)                     
         ) 
         .payloadExpression("#pathVariables.id") 
         )       
         .channel(MessageChannels.queue()) 
         .log(LoggingHandler.Level.DEBUG)        
         .log() 
         .toReactivePublisher(); 
    } 


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE) 
public Flux<String> eventMessages(@PathVariable String id){  
    return Flux.from(httpReactiveSource())    
      .map(Message::getPayload); 

} 

UPDATE1:

build.gradle

buildscript { 
    ext { 
     springBootVersion = '2.0.0.M2' 
    } 
    repositories { 
     mavenCentral() 
     maven { url "https://repo.spring.io/snapshot" } 
     maven { url "https://repo.spring.io/milestone" } 
    } 
    dependencies { 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") 
    } 
} 

apply plugin: 'java' 
apply plugin: 'eclipse' 
apply plugin: 'org.springframework.boot' 
apply plugin: 'io.spring.dependency-management' 

version = '0.0.1-SNAPSHOT' 
sourceCompatibility = 1.8 

repositories { 
    mavenCentral() 
    maven { url "https://repo.spring.io/snapshot" } 
    maven { url "https://repo.spring.io/milestone" } 
} 


dependencies { 
    compile('org.springframework.boot:spring-boot-starter-freemarker') 
    compile('org.springframework.boot:spring-boot-starter-integration') 
    compile('org.springframework.boot:spring-boot-starter-web') 
    compile('org.springframework.boot:spring-boot-starter-webflux') 
    compile('org.springframework.integration:spring-integration-http') 
    testCompile('org.springframework.boot:spring-boot-starter-test') 
    testCompile('io.projectreactor:reactor-test') 

} 

アップデート2

それは@SpringBootApplication@RestControllerは、1つのファイルで定義されているときに動作しますが、ときに動作を停止@SpringBootApplication@RestControllerは別々のファイルにあります。

TestApp.java

package com.example; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 


@SpringBootApplication 
public class TestApp { 
    public static void main(String[] args) { 
      SpringApplication.run(TestApp.class, args); 
    } 
} 

TestController.java

package com.example.controller; 


import org.springframework.context.annotation.Bean; 
import org.reactivestreams.Publisher; 
import org.springframework.http.HttpMethod; 
import org.springframework.http.MediaType; 
import org.springframework.integration.dsl.IntegrationFlows; 
import org.springframework.integration.dsl.channel.MessageChannels; 
import org.springframework.integration.handler.LoggingHandler; 
import org.springframework.integration.http.dsl.Http; 
import org.springframework.messaging.Message; 
import org.springframework.web.bind.annotation.RestController; 



import org.springframework.web.bind.annotation.GetMapping; 
import reactor.core.publisher.Flux; 



@RestController 
public class TestController { 
    @Bean 
     public Publisher<Message<String>> httpReactiveSource() { 
      return IntegrationFlows. 
        from(Http.inboundChannelAdapter("/message/{id}") 
          .requestMapping(r -> r 
            .methods(HttpMethod.POST) 
          ) 
          .payloadExpression("#pathVariables.id") 
        ) 
        .channel(MessageChannels.queue()) 
        .toReactivePublisher(); 
     } 

     @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) 
     public Flux<String> eventMessages() { 
      return Flux.from(httpReactiveSource()) 
        .map(Message::getPayload); 
     } 

} 
+0

どのようなSpringブート設定( 'pom')を使用しますか?サーブレットコンテナとWebFluxをどのように組み合わせますか? –

+0

こんにちはArtem、それを調べてくれてありがとう。私は 'SpringBootApplication v。 '2.0.0.M2''のデフォルトでそれを試しています。私は上記の2つのメソッドを持つ '@ RestController'を持っています。そして、それは私のために働くActiveMQのトピックのあなたの答えのように、基本的にはすべて同じです(config-wise)。しかし、JMSキューの代わりに、http REST呼び出しからメッセージを受け取ります。 – Mike

+0

私はちょうど使用されているgradleの依存関係で質問を更新しました。ありがとう! – Mike

答えて

0

これがうまく私の作品:

@SpringBootApplication 
@RestController 
public class SpringIntegrationSseDemoApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(SpringIntegrationSseDemoApplication.class, args); 
    } 

    @Bean 
    public Publisher<Message<String>> httpReactiveSource() { 
     return IntegrationFlows. 
       from(Http.inboundChannelAdapter("/message/{id}") 
         .requestMapping(r -> r 
           .methods(HttpMethod.POST) 
         ) 
         .payloadExpression("#pathVariables.id") 
       ) 
       .channel(MessageChannels.queue()) 
       .toReactivePublisher(); 
    } 

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) 
    public Flux<String> eventMessages() { 
     return Flux.from(httpReactiveSource()) 
       .map(Message::getPayload); 
    } 

} 

私はPOMでこの依存関係を持っている:

<parent> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-parent</artifactId> 
    <version>2.0.0.BUILD-SNAPSHOT</version> 
    <relativePath/> <!-- lookup parent from repository --> 
</parent> 

<dependencies> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-integration</artifactId> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-web</artifactId> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-webflux</artifactId> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-http</artifactId> 
    </dependency> 

    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-test</artifactId> 
     <scope>test</scope> 
    </dependency> 
</dependencies> 

<build> 
    <plugins> 
     <plugin> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-maven-plugin</artifactId> 
     </plugin> 
    </plugins> 
</build> 

私は、アプリケーションを実行する2つの端子があります。SSESを聞くこと

curl http://localhost:8080/events 

を。

そして、私はこれを実行秒1で:だから

curl -X POST http://localhost:8080/message/foo 

curl -X POST http://localhost:8080/message/bar 

curl -X POST http://localhost:8080/message/666 

、最初の端末は次のように応答します。

​​

注意、我々はspring-boot-starter-webflux依存関係を必要としません。 FluxからSSEまでは、サーブレットコンテナ上の通常のMVCでうまく動作します。

Spring Integrationは、すぐにWebFluxもサポートします:https://jira.spring.io/browse/INT-4300

IntegrationFlows 
    .from(Http.inboundReactiveGateway("/sse") 
         .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) 

、完全には、任意のサーブレットコンテナ依存せずに、単にだけWebFluxを頼る:だから、あなたはそこのようなものを設定することができるようになります。

+0

こんにちはアルテム、もう一度説明してくれてありがとう。あなたのコードは動作しています。なぜなら、あるファイルに定義されている '@ SpringBootApplication'と' @ RestController'の何らかの理由で、それらが動作するのですが、それらが別々のファイルにある(実際には実際のケース)それはしません。 – Mike

+0

あなたの言ってる事がわかります。月曜日にそれを再生します –

+0

見て、問題はあなたの 'TestController'に' @Configuration'がありません。正しく、 'httpReactiveSource()' '@ Bean'は" light configuration "メカニズムのおかげで正しく処理され、登録されます。しかし、 'eventMessages()'メソッドから直接 'httpReactiveSource()'メソッドを呼び出すので、プロキシ呼び出しはなく、適切なbean解決のために 'httpReactiveSource()'はBeanファクトリに委譲しません。ですから、 'httpReactiveSource()'定義を '@Configuration'クラスに移動し、' Publisher > 'の' TestController'に '@ Autowired'を使用することを検討してください。 –

関連する問題