ダウンストリーム処理のエラーメッセージを追跡する必要がある場合は、Spring Cloud Streamに関連付けられたOOTB DLQ機構を使用できます。 RabbitとKafkaの両方でサポートされています。 Spring Cloud Data Flow(SCDF)でDLQをglobal settingまたはストリームごとに有効にできます。
もしもがの場合、カスタムチャネルを定義してメッセージを別々に扱うようにしたい場合は、sampleのようなカスタムインターフェイスを作成する必要があります。
ストリームをSCDFにデプロイするときに、それぞれ、spring.cloud.stream.kafka.bindings.<channelName>.producer
とspring.cloud.stream.kafka.bindings.<channelName>.consumer
というバインディングプロパティを使用して、プロデューサとコンシューマ間の宛先を上書きできます。
EDIT:
上記のアプローチがありますけれどもが、私は春のクラウドストリームリード(@マリウス-bogoevici)からはるかに簡単な解決策について学びました。
すでに使用可能なデフォルトのエラー・チャネルがあり、Spring Integrationがそれをサポートしています。
これで、あなたのアプリケーションでカスタムメッセージをデフォルトのエラーチャネルに送信することができます:@Autowire @Qualifier("errorChannel")
。実際、このサポートはでもあり、はすべてのOOTBアプリケーションで利用できます。
このエラーチャネルの宛先は、spring.cloud.stream.bindings.error.destination=errorchannel-test
でオーバーライドできます.SCDFでは、ストリーム配信時に--properties
を渡します。例えば
:
ストリームのfoo --definitionを作成する "MYSOURCEは|ログ"
ストリームfooの--properties「app.mysource.spring.cloud.stream.bindings.error.destinationを展開= errorchannel-test "
ありがとう、Sabby。私はダウンストリームのエラーメッセージを処理したいが、通常のメッセージと同じプロセッサでは処理したくない。また、エラーメッセージはカスタムメッセージです。だからDLQはうまくフィットしていないようですね。 このサンプルは、Javaコードをチャネルに接続するのに役立ちます。しかし、私が苦労しているのは、SCDFにチャンネルをKafkaのトピックに接続させる方法です。ソースからエラープロセッサーへの別のストリームを作成するだけですか? –
これを実現するためにストリーム定義で何もする必要はないことが判明しました。 ストリームアプリでは、別のチャンネルを追加するインターフェースで '@ EnableBinding'を使うだけです。ソースについては、 'MessageChannel'でなければなりません。プロセッサ/シンクの場合は、サブスクライブ可能なチャネルでなければなりません。 Spring Cloud Streamはカフカのトピックにチャンネルをマッピングします。ソースとプロセッサ/シンクがチャンネルに同じ名前を使用していることを確認するだけです。 プロパティを使用して宛先を上書きする必要はありません。 –