2017-09-18 12 views
1

処理する人を送信し、すべてのエラーを収集するサンプルアプリケーションを作成しようとしています.JavaScriptを使用してシリアル化しています。プロセッサとシンク間のメッセージの変換

しかし、私のシンクはプロセッサから受信したメッセージを変換しようとすると失敗します。 application/jsonではなくcontentTypeの異なるメッセージを受信して​​いるようです。これを実現するにはどうすればよいですか?私は春の雲Dalston.SR3

メインを使用してい

@SpringBootApplication 
public class PersonStreamApplication { 

    @Bean 
    public MessageConverter customMessageConverter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule())); 
     return converter; 
    } 

    public static void main(String[] args) { 
     SpringApplication.run(PersonStreamApplication.class, args); 
    } 
} 

出典:

@EnableBinding(PersonSource.Source.class) 
public class PersonSource { 

    @Bean 
    @InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")) 
    public MessageSource<Person> getDate() { 
     Map<String, Object> headers = new HashMap<>(); 
     headers.put("directory", "/dir"); 
     return() -> createMessage(new Person("[email protected]", 28), new MessageHeaders(headers)); 
    } 

    public interface Source { 

     String SAMPLE = "sample-source"; 

     @Output(SAMPLE) 
     MessageChannel sampleSource(); 
    } 
} 

プロセッサ:

@RequiredArgsConstructor 
@EnableBinding(Processor.class) 
public class PersonProcessor { 

    private final PersonValidator validator; 

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
    public Either<String, Person> process(Person person) { 
     return this.validator.validate(person).toEither(); 
    } 
} 

シンク:

@EnableBinding(PersonSink.Sink.class) 
public class PersonSink { 

    @Bean 
    public IntegrationFlow log() { 
     return IntegrationFlows.from(Sink.SAMPLE).log().get(); 
    } 

    public interface Sink { 

     String SAMPLE = "sample-sink"; 

     @Input(SAMPLE) 
     SubscribableChannel sampleSink(); 
    } 
} 

application.ymlで

spring: 
    cloud: 
    stream: 
     bindings: 
     sample-source: 
      destination: testing.stream.input 
     input: 
      destination: testing.stream.input 
     output: 
      content-type: application/json 
      destination: testing.stream.output 
     sample-sink: 
      destination: testing.stream.output 

答えて

1

あなたの問題がある:https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling

区別:あなたは、利得にContent-Type変換を取得するために@StreamListenerを使用する必要が

@Bean 
public IntegrationFlow log() { 
    return IntegrationFlows.from(Sink.SAMPLE).log().get(); 
} 

@StreamListenerと春インテグレーション@ServiceActivatorは、文字列ペイロードとcontentTypeヘッダーがapplication/jsonのインバウンドメッセージを考慮する場合に表示されます。 @StreamListenerの場合、MessageConverterメカニズムはcontentTypeヘッダーを使用して、ペイロードをVoteオブジェクトに解析します。

+0

シンクを '@ StreamListener'を使用するように変更すると、データをフローに渡すためにMessageChannelを注入する必要がありますか?別の問題は、ヘッダを伝播する必要があり、@ StreamListenerで伝播しないことです。 –

+1

右、 'IntegrationFlow'のチャンネルに送るには、' @ StreamListener'と一緒に '@ SendTo'を使うべきです。あなたのメソッド引数が 'Message <>'であるか、 '@Headers Map headers'引数を' @ Payload'と一緒に持っていれば、それはヘッダーを伝播します。 OTOHでは、 '.transform(Transformers.fromJson(Person.class))'を代わりに 'IntegrationFlow'で使うと考えるかもしれません。 –

関連する問題