2017-11-22 9 views
2

私はSpring Kafkaを初めて使用しています。私のコンシューマーコードでAcknowledgement.acknowledge()メソッドを手動コミットに使用できません。私の消費者の構成やリスナーコードに欠けていることがあれば教えてください。そうでなければ、条件に基づいて肯定応答オフセットを処理する別の方法があります。 ここで私は、オフセットがコミットされていない/手動で肯定応答しない場合、同じメッセージ/消費者によってオフセットを選択する必要がありますソリューションを探しています。手動コミットのためのスプリングカフカの現在のオフセットを確認する方法

import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; 

@EnableKafka 
@Configuration 
public class ConsumerConfig { 

@Value(value = "${kafka.bootstrapAddress}") 
private String bootstrapAddress; 

@Value(value = "${kafka.groupId}") 
private String groupId; 

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() { 
Map<String, Object> props = new HashMap<String, Object>(); 
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); 
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
     StringDeserializer.class); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
     StringDeserializer.class); 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); 
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
     props)); 
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); 
factory.getContainerProperties().setSyncCommits(true); 
return factory; 
} 
} 
------------------------ 

private static int value = 1; 

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory") 
public void listenPEN_RE(@Payload String message, 
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
@Header(KafkaHeaders.OFFSET) int offsets, 
Acknowledgment acknowledgment) { 
if (value%2==0){ 
     acknowledgment.acknowledge(); 
} 
value++; 
} 

答えて

0

Apache Kafkaではそうはうまくいきません。

現在実行されているコンシューマに対しては、オフセットをコミットする心配がありません。私たちは、同じ消費者グループの新しい消費者にしか持続させる必要はありません。現在のトラックはメモリ内のオフセットを追跡します。私はブローカーのどこかで推測する。

あなたがラウンド多分次のポーリング同じ消費者で同じメッセージを再フェッチする必要がある場合は、seek()機能を使用することを検討する必要があります。https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

0

セットのプロパティをfalseに有効 - 自動コミット:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG、false);

MANUAL_IMMEDIATEにACKモードを設定します。

factory.getContainerProperties()setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)を、

次に、あなたの消費者/リスナーのコードでは、手動でオフセットをコミットすることができ、このような:

@KafkaListener(topics = "testKafka") 
public void receive(ConsumerRecord<?, ?> consumerRecord, 
     Acknowledgment acknowledgment) { 

    System.out.println("Received message: "); 
    System.out.println(consumerRecord.value().toString()); 

    acknowledgment.acknowledge(); 
} 

アップデート:私は、このための小さなPOCを作成しました。それを確認してhere、あなたを助けるかもしれない。

関連する問題