私は、春・統合・カフカとのメッセージを消費message-driven-channel-adapter
を使用しています:メッセージの消費中にエラーを正しく処理するにはどうすればよいですか?
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
channel="outputFromKafka"
error-channel="errorChannel"/>
コンテナオブジェクトに入ってくるJSONをデシリアライズするJsonDeserializer
を使用しています。
<beans:bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<beans:constructor-arg>
<beans:bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<beans:constructor-arg>
<beans:map>
<beans:entry key="bootstrap.servers" value="localhost:9092" />
<beans:entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
<beans:entry key="group.id" value="mygroup" />
</beans:map>
</beans:constructor-arg>
<beans:property name="valueDeserializer">
<beans:bean class="org.springframework.kafka.support.serializer.JsonDeserializer">
<beans:constructor-arg value="com.foo.MyType"/>
</beans:bean>
</beans:property>
</beans:bean>
</beans:constructor-arg>
<beans:constructor-arg>
<beans:bean class="org.springframework.kafka.listener.config.ContainerProperties">
<beans:constructor-arg name="topics" value="foo" />
</beans:bean>
</beans:constructor-arg>
</beans:bean>
メッセージができれば(例えば、消費者が誤って間違った型を使用したために)正常に解析されない場合、例外がスローされます。
ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request completion: org.apache.kafka.common.errors.SerializationException: Can't deserialize data ...
これ以降、アダプタは同じメッセージを再度受信しています(おそらく最後のメッセージがコミットされなかったためでしょうか)。まったく同じ方法で失敗し、無限ストリームの例外が発生します。
設定されたerror-channel
が使用されていないようです。
このようなエラーを処理するオプションは何ですか?また、XMLでどのように設定されますか?
私の編集を参照してください - 例外をキャッチし、デシリアライザに失敗したことを示す何かを返すために、デシリアライザをカスタマイズする必要があります。すなわちいくつかの「特別な」「MyType」インスタンスを含む。 –
あなたは正しいです。 'org.apache.kafka.common.serialization.Deserializer'によってスローされた例外は、Kafkaクライアントコードによって単に捕捉され、記録され、介入する可能性はありません。 デシリアライズされたタイプのマジックインスタンスを使用することは技術的に可能ですが、私の原則のかなりの部分に違反しています;) – Tom
カフカの値デシリアライザを使用するのはJSONにとって良い選択ではないかもしれません。代わりに、StringDeserializerと別のjson toオブジェクトトランスフォーマーは、より柔軟性と堅牢性を提供します。 – Tom