2017-07-07 13 views
0

私は、次のコードを使用してJMSのActiveMQからのメッセージを消費しています:春の統合 - JMSは、カフカのメッセージ転送に - エンドのトランザクションに終わり

<jms:message-driven-channel-adapter 
     id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory" 
     channel="helloChannel" extract-payload="true" /> 
<integration:channel id="helloChannel" /> 

私の要件は、ここから消費し、カフカアウトバウンド・アダプタに投稿することです。以下の設定を使用する:

  1. マイキューは耐久性のある話題であり、それが正常にカフカに公開されていない限り、レコードを確認したいいけない:ここで

    <int-kafka:outbound-channel-adapter 
          id="kafkaOutboundChannelAdapter" 
          kafka-producer-context-ref="kafkaProducerContext" 
          channel="inputToKafka"> 
    </int-kafka:outbound-channel-adapter> 
    

    は私が達成したいものです。要するに、私は、jmsからのメッセージを消費して、それをKafkaに公開するというトランザクションの振る舞いをしたいのです。

  2. メッセージが即座にデキューされ、処理中に例外が発生した場合、再処理できません。私はそれが起こることを望んでいない。

  3. また、kafkaが何らかの問題に遭遇したとき、私は失敗メッセージを持続することができるようにそれをいくつかの方法に戻したいと思います。

私は実際に動作させるのに苦労しています。誰かが私を助けてくれますか?

答えて

1

TXを開始するには、<jms:message-driven-channel-adapter>には実際にtransaction-managerを持つことができます。

<int-kafka:outbound-channel-adapter>が例外をスローすると、TXが再び呼び出され、メッセージが再キューされます。

持続エラーに興味がある場合は、<jms:message-driven-channel-adapter>にはerror-channelオプションがありますが、まだTXをrallbackにするには例外を再度スローする必要があります。

すべての機能を有効にするには、募集から終了までのスレッドが1つだけであることを確認する必要があります。フロー内のNo <queue>またはexecutorチャネル。

また、なぜ古いApache Kafkaを使用するのかはっきりしません。

+0

Thanks Artem。私はそれを試して元に戻す。 Apache Kafkaについて、私はSpringのリソースからリファレンスを取得しました。 Kafkaから例外をトラップし、誤ったメッセージを一部の店舗に残す方法はありますか?カフカアダプタはエラーチャンネルのようなものを提供しますか? –

+1

また、(新2.0)カフカアダプタで 'sync = true'が必要です。 JMSインバウンドアダプタにエラーチャネルを追加できます。メッセージペイロードは 'cause'と' failedMessage'プロパティを持つ 'MessagingException'です。 –

+1

いいえ、それは 'ExpressionEvaluatingRequestHandlerAdvice'で行うことができます:http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#expression-advice –

関連する問題