私は、次のコードを使用してJMSのActiveMQからのメッセージを消費しています:春の統合 - JMSは、カフカのメッセージ転送に - エンドのトランザクションに終わり
<jms:message-driven-channel-adapter
id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory"
channel="helloChannel" extract-payload="true" />
<integration:channel id="helloChannel" />
私の要件は、ここから消費し、カフカアウトバウンド・アダプタに投稿することです。以下の設定を使用する:
マイキューは耐久性のある話題であり、それが正常にカフカに公開されていない限り、レコードを確認したいいけない:ここで
は私が達成したいものです。要するに、私は、jmsからのメッセージを消費して、それをKafkaに公開するというトランザクションの振る舞いをしたいのです。<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" channel="inputToKafka"> </int-kafka:outbound-channel-adapter>
メッセージが即座にデキューされ、処理中に例外が発生した場合、再処理できません。私はそれが起こることを望んでいない。
- また、kafkaが何らかの問題に遭遇したとき、私は失敗メッセージを持続することができるようにそれをいくつかの方法に戻したいと思います。
私は実際に動作させるのに苦労しています。誰かが私を助けてくれますか?
Thanks Artem。私はそれを試して元に戻す。 Apache Kafkaについて、私はSpringのリソースからリファレンスを取得しました。 Kafkaから例外をトラップし、誤ったメッセージを一部の店舗に残す方法はありますか?カフカアダプタはエラーチャンネルのようなものを提供しますか? –
また、(新2.0)カフカアダプタで 'sync = true'が必要です。 JMSインバウンドアダプタにエラーチャネルを追加できます。メッセージペイロードは 'cause'と' failedMessage'プロパティを持つ 'MessagingException'です。 –
いいえ、それは 'ExpressionEvaluatingRequestHandlerAdvice'で行うことができます:http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#expression-advice –