処理する人を送信し、すべてのエラーを収集するサンプルアプリケーションを作成しようとしています.JavaScriptを使用してシリアル化しています。プロセッサとシンク間のメッセージの変換
しかし、私のシンクはプロセッサから受信したメッセージを変換しようとすると失敗します。 application/jsonではなくcontentTypeの異なるメッセージを受信しているようです。これを実現するにはどうすればよいですか?私は春の雲Dalston.SR3
メインを使用してい
:
@SpringBootApplication
public class PersonStreamApplication {
@Bean
public MessageConverter customMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
return converter;
}
public static void main(String[] args) {
SpringApplication.run(PersonStreamApplication.class, args);
}
}
出典:
@EnableBinding(PersonSource.Source.class)
public class PersonSource {
@Bean
@InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Person> getDate() {
Map<String, Object> headers = new HashMap<>();
headers.put("directory", "/dir");
return() -> createMessage(new Person("[email protected]", 28), new MessageHeaders(headers));
}
public interface Source {
String SAMPLE = "sample-source";
@Output(SAMPLE)
MessageChannel sampleSource();
}
}
プロセッサ:
@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {
private final PersonValidator validator;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Either<String, Person> process(Person person) {
return this.validator.validate(person).toEither();
}
}
シンク:
@EnableBinding(PersonSink.Sink.class)
public class PersonSink {
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
public interface Sink {
String SAMPLE = "sample-sink";
@Input(SAMPLE)
SubscribableChannel sampleSink();
}
}
application.ymlで
spring:
cloud:
stream:
bindings:
sample-source:
destination: testing.stream.input
input:
destination: testing.stream.input
output:
content-type: application/json
destination: testing.stream.output
sample-sink:
destination: testing.stream.output
シンクを '@ StreamListener'を使用するように変更すると、データをフローに渡すためにMessageChannelを注入する必要がありますか?別の問題は、ヘッダを伝播する必要があり、@ StreamListenerで伝播しないことです。 –
右、 'IntegrationFlow'のチャンネルに送るには、' @ StreamListener'と一緒に '@ SendTo'を使うべきです。あなたのメソッド引数が 'Message <>'であるか、 '@Headers Map headers'引数を' @ Payload'と一緒に持っていれば、それはヘッダーを伝播します。 OTOHでは、 '.transform(Transformers.fromJson(Person.class))'を代わりに 'IntegrationFlow'で使うと考えるかもしれません。 –