2016-09-27 2 views
2

kafkaコンシューマコンポーネント用のテストケースを作成し、ConsumerRecords<String,String>を返すkafkaConsumer.poll()の模擬テストケースを作成しています。私はConsumerRecordsを初期化し、それをモックで使用したいが、ConsumerRecordsのコンストラクタは、テストで持っていない実際のカフカのトピックを期待している。 私はこれについて考えると、オブジェクトのシリアル化されたコピーを保持し、ConsumerRecordsを初期化するためにデシリアライズすることです。 これを達成する他の方法はありませんか?ここでkafka ConsumerRecords <String、String>をテスト用にkafkaで初期化する方法

答えて

4

は、いくつかのサンプルコード(カフカクライアントのlibバージョン0.10.1.1)である:

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.MockConsumer; 
import org.apache.kafka.clients.consumer.OffsetResetStrategy; 
import org.apache.kafka.common.TopicPartition; 

... 
     String topic = "MyTopic"; 
     Collection<TopicPartition> partitions = new ArrayList<TopicPartition>(); 
     Collection<String> topicsCollection = new ArrayList<String>(); 
     partitions.add(new TopicPartition(topic, 1)); 
     Map<TopicPartition, Long> partitionsBeginningMap = new HashMap<TopicPartition, Long>(); 
     Map<TopicPartition, Long> partitionsEndMap = new HashMap<TopicPartition, Long>(); 

     long records = 10; 
     for (TopicPartition partition : partitions) { 
      partitionsBeginningMap.put(partition, 0l); 
      partitionsEndMap.put(partition, records); 
      topicsCollection.add(partition.topic()); 
     } 

     MockConsumer<String, MyObject> second = new MockConsumer<String, MyObject>(
       OffsetResetStrategy.EARLIEST); 
     second.subscribe(topicsCollection); 
     second.rebalance(partitions);  
     second.updateBeginningOffsets(partitionsBeginningMap); 
     second.updateEndOffsets(partitionsEndMap); 
     for (long i = 0; i < 10; i++) { 
      MyObject value = Generator.generate(); 
      ConsumerRecord<String, MyObject> record = new ConsumerRecord<String, MyObject>(
        topic, 1, i, null,value); 
      second.addRecord(record); 
     } 
    ... 
関連する問題