2017-06-27 4 views
0

enable.auto.commitをfalseに設定し、アノテーションベースのspring-kafka @ KafkaListenerを使用してオフセットを手動でコミットしようとすると、org.springframework.kafka .listener.ListenerExecutionFailedException:リスナーメソッドは、着信メッセージで呼び出すことができませんでしたspring-kafka @ KafkaListenerでAcknowledgement.acknowledge()例外をスローする

次のように私は非常に単純なコードを持っている:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory") 
public void listenFooGroup(String message, Acknowledgement ack) { 
    System.out.println("Received Messasge in group 'foo': " + message); 

    // TODO: Do something with the message 
} 

を私は生産者からのメッセージを送信するとき、私は次の取得例外:

org.springframework.kafka.listener.ListenerExecutionFailedException:受信メッセージでリスナーメソッドを呼び出すことができませんでした。

エンドポイントハンドラの詳細:

方法[ます。public void COM **** ***** ******* KafkaMessageListener.listenFooGroup(java.lang.Stringで、org.springframework。。。。 .kafka.support.Acknowledgment)]

ビーン[com.****.*****.*******[email protected]]。ネストされた例外はorg.springframework.messaging.converter.MessageConversionExceptionです:メッセージを処理できません。ネストされた例外はorg.springframework.messaging.converter.MessageConversionExceptionです:GenericMessageの[java.lang.String]から[org.springframework.kafka.support.Acknowledgement]に変換できません[payload = test、headers = {kafka_offset = 57、kafka_receivedMessageKey = NULL、kafka_receivedPartitionId = 0、kafka_receivedTopic =にdemoTopic}]、 failedMessage = GenericMessage [ペイロード=テスト、ヘッダー= {kafka_offset = 57、kafka_receivedMessageKey =ヌル、kafka_receivedPartitionId = 0、kafka_receivedTopic =にdemoTopic}]

お願い。 TIA。

答えて

3

あなたがセットにMANUALまたはMANUAL_IMMEDIATEにコンテナ工場のcontainerProperties ackModeはAcknowledgmentオブジェクトを取得する必要があります。

他のackモードでは、コンテナはオフセットをコミットする責任があります。

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE) 

または[以下不明瞭例外をスロー]うスプリングブート

+0

2.0を使用する場合....ackModeプロパティを設定する(https://github.com/spring-projects/spring-kafka/pull/356) - これを指摘してくれてありがとう。 –

+0

これで例外は発生しません。どうもありがとう。あなたの助けに感謝。 –

+1

次のリリースでは、[もっと意味のある例外](https://github.com/spring-projects/spring-kafka/pull/356)が投げられます。 'new IllegalStateException("肯定応答は引数として利用できません。リスナーコンテナには肯定応答を設定するためのMANUAL Ackmodeが必要です。 "、'。これを指摘してくれてありがとう。 –

関連する問題