http-outbound-gatewayを持つhttp-processorで、errorChannelとoutputChannelを持つspring-xd http-processorモジュールがあります。 HTTP 200のメッセージはoutputChannelに送られ、残りはfailureChannelに格納されます。kafka-sinkから複数のトピックにメッセージをルーティングする方法
現在、http-processorモジュールは、TopicXを使用したkafka-outbound-adapterを使用してKafka-Sinkに接続します。 TopicXは、さらに処理するためにHTTP 200メッセージのみを受信します。これで、TopicYにルーティングされるfailureChannelのメッセージが必要になります。
kafka-sinkに複数のkafkaトピックを送信するにはどうすればよいですか。私は、メッセージヘッダーにhttpStatusCodeを持っています。私のプロジェクトで使用さカフカのバージョンは0.8.2で、Javaのバージョンが1.7カフカシンクオン
<!-- http-processor-config -->
<int-http:outbound-gateway
request-channel="input"
url-expression="'myUrlLink'"
http-method="POST"
expected-response-type="java.lang.String"
charset="UTF-8"
reply-timeout="10"
reply-channel="output">
<int-http:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="errorChannel" />
</bean>
</property>
<property name="retryTemplate" ref="retryTemplate" />
</bean>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>
、私は次のようしているプロデューサーのコンテキストである:本当だ
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="${topicX}"
key-class-type="java.lang.String"
key-serializer="serializer"
value-class-type="[B"
value-serializer="valueSerializer"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
私は最終的にそれを得ました。 – Vidhya