2016-10-03 13 views
0

私は、春・統合・カフカとのメッセージを消費message-driven-channel-adapterを使用しています:メッセージの消費中にエラーを正しく処理するにはどうすればよいですか?

<int-kafka:message-driven-channel-adapter 
    id="kafkaListener" 
    listener-container="container1" 
    channel="outputFromKafka" 
    error-channel="errorChannel"/> 

コンテナオブジェクトに入ってくるJSONをデシリアライズするJsonDeserializerを使用しています。

<beans:bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
    <beans:constructor-arg> 
     <beans:bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
      <beans:constructor-arg> 
       <beans:map> 
        <beans:entry key="bootstrap.servers" value="localhost:9092" /> 
        <beans:entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> 
        <beans:entry key="group.id" value="mygroup" /> 
       </beans:map> 
      </beans:constructor-arg> 
      <beans:property name="valueDeserializer"> 
       <beans:bean class="org.springframework.kafka.support.serializer.JsonDeserializer"> 
        <beans:constructor-arg value="com.foo.MyType"/> 
       </beans:bean> 
      </beans:property> 
     </beans:bean> 
    </beans:constructor-arg> 
    <beans:constructor-arg> 
     <beans:bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
      <beans:constructor-arg name="topics" value="foo" /> 
     </beans:bean> 
    </beans:constructor-arg> 
</beans:bean> 

メッセージができれば(例えば、消費者が誤って間違った型を使用したために)正常に解析されない場合、例外がスローされます。

ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ... 

これ以降、アダプタは同じメッセージを再度受信しています(おそらく最後のメッセージがコミットされなかったためでしょうか)。まったく同じ方法で失敗し、無限ストリームの例外が発生します。

設定されたerror-channelが使用されていないようです。

このようなエラーを処理するオプションは何ですか?また、XMLでどのように設定されますか?

答えて

0

設定されたエラーチャネルが使用されていないようです。

あなたはそれを何と考えますか?

私はちょうどerrorChannelにエラーチャネルセット(ログアダプタでデフォルト、)でテストを実行しました...

12:06:08.366 [container-kafka-consumer-1] ERROR o.s.i.handler.LoggingHandler - ... 

あなたは失敗を示すorg.springframework.integrationのデバッグログスニペットを提供することはできますか?春の統合がメッセージを取得する前に、スタック内のダウンはるかに低いです

EDIT

あ、ごめん...

ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ... 

+0

私の編集を参照してください - 例外をキャッチし、デシリアライザに失敗したことを示す何かを返すために、デシリアライザをカスタマイズする必要があります。すなわちいくつかの「特別な」「MyType」インスタンスを含む。 –

+0

あなたは正しいです。 'org.apache.kafka.common.serialization.Deserializer'によってスローされた例外は、Kafkaクライアントコードによって単に捕捉され、記録され、介入する可能性はありません。 デシリアライズされたタイプのマジックインスタンスを使用することは技術的に可能ですが、私の原則のかなりの部分に違反しています;) – Tom

+0

カフカの値デシリアライザを使用するのはJSONにとって良い選択ではないかもしれません。代わりに、StringDeserializerと別のjson toオブジェクトトランスフォーマーは、より柔軟性と堅牢性を提供します。 – Tom

関連する問題