2017-03-28 2 views
0

私は複数のブローカーからのメッセージを消費するためにJavaクライアント(Kafka Consumer)を探していました。アドバイスをお願いします複数のブローカーからのメッセージを消費するためにJavaでKafka Consumer Clientを書くには?

以下は、単純なパーティショナーを使用して複数のブローカーにメッセージを公開するために書かれたコードです。

トピックは複製係数 "2"とパーティション "3"で作成されます。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
{ 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
    int numPartitions = partitions.size(); 
    logger.info("Number of Partitions " + numPartitions); 
    if (keyBytes == null) 
    { 
     int nextValue = counter.getAndIncrement(); 
     List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); 
     if (availablePartitions.size() > 0) 
     { 
      int part = toPositive(nextValue) % availablePartitions.size(); 
      int selectedPartition = availablePartitions.get(part).partition(); 
      logger.info("Selected partition is " + selectedPartition); 
      return selectedPartition; 
     } 
     else 
     { 
      // no partitions are available, give a non-available partition 
      return toPositive(nextValue) % numPartitions; 
     } 
    } 
    else 
    { 
     // hash the keyBytes to choose a partition 
     return toPositive(Utils.murmur2(keyBytes)) % numPartitions; 
    } 

} 


public void publishMessage(String message , String topic) 
{ 
    Producer<String, String> producer = null; 
    try 
    { 
    producer = new KafkaProducer<>(producerConfigs()); 
    logger.info("Topic to publish the message --" + this.topic); 
    for(int i =0 ; i < 10 ; i++) 
    { 
    producer.send(new ProducerRecord<String, String>(this.topic, message)); 
    logger.info("Message Published Successfully"); 
    } 
    } 
    catch(Exception e) 
    { 
     logger.error("Exception Occured " + e.getMessage()) ; 
    } 
    finally 
    { 
    producer.close(); 
    } 
} 

public Map<String, Object> producerConfigs() 
{ 
    loadPropertyFile(); 
    Map<String, Object> propsMap = new HashMap<>(); 
    propsMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); 
    propsMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    propsMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    propsMap.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SimplePartitioner.class); 
    propsMap.put(ProducerConfig.ACKS_CONFIG, "1"); 
    return propsMap; 
} 

public Map<String, Object> consumerConfigs() { 
    Map<String, Object> propsMap = new HashMap<>(); 
    System.out.println("properties.getBootstrap()" + properties.getBootstrap()); 
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrap()); 
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getAutocommit()); 
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getTimeout()); 
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupid()); 
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutooffset()); 
    return propsMap; 
} 

@KafkaListener(id = "ID1", topics = "${config.topic}", group = "${config.groupid}") 
public void listen(ConsumerRecord<?, ?> record) 
{ 
    logger.info("Message Consumed " + record); 
    logger.info("Partition From which Record is Received " + record.partition()); 
    this.message = record.value().toString(); 
} 

bootstrap.servers = [localhostを:9092、localhostを:9093、localhostの:9094]

答えて

0

を使用すると、通常のJava消費者を使用する場合は、自動的に複数のブローカーから読み込みます。あなたが書く必要がある特別なコードはありません。コンシューマに送信するトピックを購読するだけで、コンシューマは対応するブローカに自動的に接続します。 「単一エントリ・ポイント」ブローカのみを提供します。クライアントは、クラスタの他のすべてのブローカを自動的に特定します。

+0

ご回答ありがとうございます。私は単純なカフカ消費者を使って試しました。メッセージは消費されません。私は消費者ロジックを投稿するために私の質問を編集しました。 – Gopi

+0

'@ KafkaListener'が何をしているのか分かりません...おそらくこれは役に立ちます:http://docs.confluent.io/current/clients/consumer.html –

+0

ありがとうございます。 – Gopi

関連する問題