メッセージングシステムとしてApache Kafkaを使用し、Spring Cloud Stream Kafkaを使用してサービス間で通信するイベントベースのシステムを作成しようとしています。Spring Cloud Streamのストリーム属性で受信したメッセージをフィルタリングできません@StreamListenerアノテーション
私はこの方法はEmployeeCreatedEventに関連するメッセージやイベントのためにキャッチするために特別で、以下のように
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}
を私レシーバのクラスメソッドを書かれています。
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}
この方法はEmployeeTransferredEventに関連するメッセージやイベントのためにキャッチするために特別です。
@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}
これがデフォルトの方法です。
私はアプリケーションを実行すると、呼び出されている状態属性でアノテーションされたメソッドを見ることができません。私はhandleDefaultEventメソッドが呼び出されているのを見るだけです。私は以下のように送信/ソースアプリケーションが使用して以下のCustomMessageSourceクラスからこのレシーバアプリケーションにメッセージを送信しています
、
@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;
public void sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}
私は以下のようにソースAppで私のコントローラからメソッドを呼び出しています、
customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");
customMessageSourceインスタンスは以下のようにautowiredされ、
@Autowired
CustomMessageSource customMessageSource;
基本的に、Sink/Receiverアプリケーションで受信したメッセージをフィルタリングし、それに応じて処理したいと思います。
このため、さまざまなイベントを処理する動作をシミュレートするために、condition属性で@StreamListenerアノテーションを使用しました。
私はSpring Cloud Stream Chelsea.SR2バージョンを使用しています。
誰かがこの問題の解決に手伝ってくれますか?
お返事ありがとうございます。しかし、私はあなたの応答を得ることはありません。私は、ペイロードとヘッダを入力として受け取ってメッセージを明示的に構築しています。私はなぜ設定ファイルにカスタムヘッダーを含めるべきかわかりません。 – juser
Springクラウドストリームはデフォルトでカスタムヘッダーを伝播しません。この設定はプロデューサアプリケーションに指示する必要があります。 –
カフカ(現在)はヘッダーの概念を持っていません。それらをデータに埋め込む必要があります。したがって、不要なデータの追加を避けるために、オプトインアプローチを採用しています。 –