2016-10-17 8 views
1

スプリング統合でkafkaにメッセージを生成できませんでした。スプリング統合kafkaアウトバウンドアダプタエラーハンドル

'error-channel'が 'int-kafka:outbound-channel-adapter'のオプションで、ErrorHandlerがエラーのチャンネル情報を追加して "生成できませんでした" to kafka "というエラーが発生します。 (すべてのタイプの障害、構成、ネットワークなどを含む)

また、inputToKafkaはキューに入れられます。潜在的なキューの完全なエラーを処理するにはエラーチャネルを追加する必要がありますか?

<int:gateway id="myGateway" 
      service-interface="someGateway" 
      default-request-channel="transformChannel" 
      error-channel="errorChannel" 
      default-reply-channel="replyChannel" 
      async-executor="MyThreadPoolTaskExecutor"/> 

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka"> 
    <bean class="Transformer"/> 
</int:transformer> 

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
            kafka-template="template" 
            auto-startup="false" 
            channel="inputToKafka" 
            topic="foo" 
            message-key-expression="'bar'" 
            partition-id-expression="2"> 
    <int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0" 
        task-executor="kafkaExecutor"/> 
</int-kafka:outbound-channel-adapter> 

<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    .... 
</bean> 

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> 
    <constructor-arg> 
     <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
      <constructor-arg> 
       <map> 
        <entry key="bootstrap.servers" value="localhost:9092" /> 
        ... 
       </map> 
      </constructor-arg> 
     </bean> 
    </constructor-arg> 
</bean> 

<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'> 
    <bean class="ErrorHandler"/> 
</int:service-activator> 

編集

<property name="producerListener"> 
    <bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/> 
</property> 

答えて

1

下流の流れ上のすべてのエラーは、あなたのゲートウェイでerror-channelに送信されます。しかし、kafkaはデフォルトで非同期であるため、エラーは発生しません。送信アダプターにsync=trueを設定すると、問題が発生した場合に例外がスローされます。

でも、それははるかに遅くなります。

KafkaTemplateProducerListenerを追加すると、非同期の例外が発生することがあります。

+0

私は単に私のKafkaTemplate beanに 'edit'パートを追加する必要がありますか? (元の質問の編集を参照してください) – edi

+0

アダプタをサブクラス化し、 'onError()'(またはその両方)で何らかのアクションを取る必要があります。 'onSuccessI()'を実装する場合、 'isInterestedInSuccess()'をtrueにオーバーライドする必要があります。デフォルトのリスナー( 'LoggingProducerListener')はエラーを記録するだけです。 –

+0

私はonErrorがvoidを返すのを見ました。私の元の質問でErrorHandlerにメッセージが送られるようにする方法は?または、replyChannelに情報を返す別のerrorHandlerを実装する他の方法 – edi

関連する問題