2

http-outbound-gatewayを持つhttp-processorで、errorChannelとoutputChannelを持つspring-xd http-processorモジュールがあります。 HTTP 200のメッセージはoutputChannelに送られ、残りはfailureChannelに格納されます。kafka-sinkから複数のトピックにメッセージをルーティングする方法

現在、http-processorモジュールは、TopicXを使用したkafka-outbound-adapterを使用してKafka-Sinkに接続します。 TopicXは、さらに処理するためにHTTP 200メッセージのみを受信します。これで、TopicYにルーティングされるfailureChannelのメッセージが必要になります。

kafka-sinkに複数のkafkaトピックを送信するにはどうすればよいですか。私は、メッセージヘッダーにhttpStatusCodeを持っています。私のプロジェクトで使用さカフカのバージョンは0.8.2で、Javaのバージョンが1.7カフカシンクオン

<!-- http-processor-config --> 
<int-http:outbound-gateway 
     request-channel="input" 
     url-expression="'myUrlLink'" 
     http-method="POST" 
     expected-response-type="java.lang.String" 
     charset="UTF-8" 
     reply-timeout="10" 
     reply-channel="output"> 

     <int-http:request-handler-advice-chain> 
        <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice"> 
         <property name="recoveryCallback"> 
          <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer"> 
           <constructor-arg ref="errorChannel" /> 
          </bean> 
         </property> 
         <property name="retryTemplate" ref="retryTemplate" /> 
        </bean> 
     </int-http:request-handler-advice-chain> 

</int-http:outbound-gateway> 


<!-- Handle failed messages and route to failureChannel for specific http codes--> 
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/> 

、私は次のようしているプロデューサーのコンテキストである:本当だ

<int-kafka:producer-context id="kafkaProducerContext"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration broker-list="localhost:9092" 
              topic="${topicX}" 
              key-class-type="java.lang.String" 
              key-serializer="serializer" 
              value-class-type="[B" 
              value-serializer="valueSerializer"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

答えて

1

私は最終的にそれを得ました。今私は、HTTPプロセッサモジュールにスプリッタを追加して0.8.xバージョンの回避策を見つけ、kafka_topic変数をメッセージヘッダに追加しました。 HTTPのステータスコードに基づいて、私はちょうど異なるトピックを設定します。

Kafka-sinkでは、XD paramsで設定した新しいトピック名変数を持つ別のプロデューサ構成を追加しました。 kafka-sourceとkafka-sinkモジュールを複数のストリームで再利用しているので、他の解決策は考えられません。

この特定のkafka-sinkは、入力を別のXDストリームに送信します。そこで、次のストリームの開始時にkafka-sourceモジュールのkafka_topicを削除するヘッダフィルタを追加しました。

続きを読むには:ターゲット・カフカのトピックを設定するためのライン http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.0.2.RELEASE/reference/html/_spring_integration.html

ルック。それが鍵です。

1

は、それがサポートされていません。そうではありません。今年はSpring XDがEOLです。誰もがSpring Cloud Data Flowに移行することをお勧めします。

あなたのケースでは、Kafka Sinkモジュールの設定を編集できます。もう1つのトピックに<int-kafka:outbound-channel-adapter>をもう1つ追加してください。着信メッセージをどこに送信するかを決定するには、この設定に<router>を追加します。

Router Sinkをご利用ください。また、各メッセージタイプごとに2つの別々のストリームがあり、したがってそれぞれのトピックがあります。

+0

私は最終的にそれを得ました。 – Vidhya

関連する問題