2017-03-07 4 views
0

デフォルトのoutputチャネルを介してストリーム内のプロセッサにメッセージを送信するソースがあります。今私は別のチャネルを介して失敗メッセージを送信したいと思います。SCDFで2つの異なる出力チャネルにメッセージを送信する方法は?

私はSourceから拡張するバインド可能なインターフェイスを作成し、@Outputを使用して余分なチャンネルを追加する必要があると考えました。 SCDFが実際にこのチャンネルのカフカのトピックを作成するようにするにはどうすればよいですか? IOW、ストリーム定義はどのように見えますか?

など。異なるチャネル/トピックを使用して、通常のoutputチャネル/カフカのトピック、およびsource > error-sinkを使用してsource | processor

source | processor | sink source > error-sink

の線に沿って何か。

答えて

1

ダウンストリーム処理のエラーメッセージを追跡する必要がある場合は、Spring Cloud Streamに関連付けられたOOTB DLQ機構を使用できます。 RabbitKafkaの両方でサポートされています。 Spring Cloud Data Flow(SCDF)でDLQをglobal settingまたはストリームごとに有効にできます。

もしもの場合、カスタムチャネルを定義してメッセージを別々に扱うようにしたい場合は、sampleのようなカスタムインターフェイスを作成する必要があります。

ストリームをSCDFにデプロイするときに、それぞれ、spring.cloud.stream.kafka.bindings.<channelName>.producerspring.cloud.stream.kafka.bindings.<channelName>.consumerというバインディングプロパティを使用して、プロデューサとコンシューマ間の宛先を上書きできます。

EDIT:

上記のアプローチがありますけれどもが、私は春のクラウドストリームリード(@マリウス-bogoevici)からはるかに簡単な解決策について学びました。

すでに使用可能なデフォルトのエラー・チャネルがあり、Spring Integrationがそれをサポートしています。

これで、あなたのアプリケーションでカスタムメッセージをデフォルトのエラーチャネルに送信することができます:@Autowire @Qualifier("errorChannel")。実際、このサポートはでもあり、はすべてのOOTBアプリケーションで利用できます。

このエラーチャネルの宛先は、spring.cloud.stream.bindings.error.destination=errorchannel-testでオーバーライドできます.SCDFでは、ストリーム配信時に--propertiesを渡します。例えば

ストリームのfoo --definitionを作成する "MYSOURCEは|ログ"

ストリームfooの--properties「app.mysource.spring.cloud.stream.bindings.error.destinationを展開= errorchannel-test "

+0

ありがとう、Sabby。私はダウンストリームのエラーメッセージを処理したいが、通常のメッセージと同じプロセッサでは処理したくない。また、エラーメッセージはカスタムメッセージです。だからDLQはうまくフィットしていないようですね。 このサンプルは、Javaコードをチャネルに接続するのに役立ちます。しかし、私が苦労しているのは、SCDFにチャンネルをKafkaのトピックに接続させる方法です。ソースからエラープロセッサーへの別のストリームを作成するだけですか? –

+0

これを実現するためにストリーム定義で何もする必要はないことが判明しました。 ストリームアプリでは、別のチャンネルを追加するインターフェースで '@ EnableBinding'を使うだけです。ソースについては、 'MessageChannel'でなければなりません。プロセッサ/シンクの場合は、サブスクライブ可能なチャネルでなければなりません。 Spring Cloud Streamはカフカのトピックにチャンネルをマッピングします。ソースとプロセッサ/シンクがチャンネルに同じ名前を使用していることを確認するだけです。 プロパティを使用して宛先を上書きする必要はありません。 –

関連する問題