0

メッセージングシステムとしてApache Kafkaを使用し、Spring Cloud Stream Kafkaを使用してサービス間で通信するイベントベースのシステムを作成しようとしています。Spring Cloud Streamのストリーム属性で受信したメッセージをフィルタリングできません@StreamListenerアノテーション

私はこの方法はEmployeeCreatedEventに関連するメッセージやイベントのためにキャッチするために特別で、以下のように

@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'") 
    public void handleEmployeeCreatedEvent(@Payload String payload) { 
     logger.info("Received EmployeeCreatedEvent: " + payload); 
    } 

を私レシーバのクラスメソッドを書かれています。

@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'") 
    public void handleEmployeeTransferredEvent(@Payload String payload) { 
     logger.info("Received EmployeeTransferredEvent: " + payload); 
    } 

この方法はEmployeeTransferredEventに関連するメッセージやイベントのためにキャッチするために特別です。

@StreamListener(target = Sink.INPUT) 
    public void handleDefaultEvent(@Payload String payload) { 
     logger.info("Received payload: " + payload); 
    } 

これがデフォルトの方法です。

私はアプリケーションを実行すると、呼び出されている状態属性でアノテーションされたメソッドを見ることができません。私はhandleDefaultEventメソッドが呼び出されているのを見るだけです。私は以下のように送信/ソースアプリケーションが使用して以下のCustomMessageSourceクラスからこのレシーバアプリケーションにメッセージを送信しています

@Component 
@EnableBinding(Source.class) 
public class CustomMessageSource { 
    @Autowired 
    private Source source;     


    public void sendMessage(String payload,String eventType) { 
     Message<String> myMessage = MessageBuilder.withPayload(payload) 
       .setHeader("eventType", eventType) 
       .build(); 
     source.output().send(myMessage); 

    } 

} 

私は以下のようにソースAppで私のコントローラからメソッドを呼び出しています、

customMessageSource.sendMessage("Hello","EmployeeCreatedEvent"); 

customMessageSourceインスタンスは以下のようにautowiredされ、

@Autowired 
CustomMessageSource customMessageSource; 

基本的に、Sink/Receiverアプリケーションで受信したメッセージをフィルタリングし、それに応じて処理したいと思います。

このため、さまざまなイベントを処理する動作をシミュレートするために、condition属性で@StreamListenerアノテーションを使用しました。

私はSpring Cloud Stream Chelsea.SR2バージョンを使用しています。

誰かがこの問題の解決に手伝ってくれますか?

答えて

1

ヘッダーが伝播していないようです。カスタムヘッダーをspring.cloud.stream.kafka.binder.headershttp://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_propertiesに含めてください。

+0

お返事ありがとうございます。しかし、私はあなたの応答を得ることはありません。私は、ペイロードとヘッダを入力として受け取ってメッセージを明示的に構築しています。私はなぜ設定ファイルにカスタムヘッダーを含めるべきかわかりません。 – juser

+0

Springクラウドストリームはデフォルトでカスタムヘッダーを伝播しません。この設定はプロデューサアプリケーションに指示する必要があります。 –

+0

カフカ(現在)はヘッダーの概念を持っていません。それらをデータに埋め込む必要があります。したがって、不要なデータの追加を避けるために、オプトインアプローチを採用しています。 –

関連する問題