2017-10-23 17 views
0

私は0.8.2.0から0.11.0.0にKafkaクライアントを更新しています。Kafka:バージョン0.11のConsumerConnectorの新しいAPI

私の古いコードでは、createMessageStreamsメソッドを使用してメッセージストリームを取得し、各トピックのストリームを繰り返し処理するのには、ConsumerConnectorを使用します。しかし、ConsumerConnectorは新しいAPIで償却されているようです。

package kafka.consumer 

import ... 
/** 
* Main interface for consumer 
*/ 
@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0") 
trait ConsumerConnector { 
    ... 
    def createMessageStreams[K,V](topicCountMap: Map[String,Int], 
           keyDecoder: Decoder[K], 
           valueDecoder: Decoder[V]): Map[String,List[KafkaStream[K,V]]] 
    ... 
} 

私は新しいAPIを見上げると2つの候補が見つかりました:org.apache.kafka.streams にorg.apache.kafka.clients.consumer

  • Stream API

    • Client API

      どちらを使用しますか?そして、私は新しいカフカAPIで同じことをどのように達成できますか?

  • 答えて

    1

    新しい消費者の例は以下の通りです:

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("foo", "bar")); 
    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(100); 
        for (ConsumerRecord<String, String> record : records) 
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
    } 
    

    は詳細についてhttps://kafka.apache.org/0110/javadoc/index.htmlを参照してください。

    +0

    最新のAPIでは、 'ストリーム'を作成するのではなく、定期的に 'poll'する必要がありますか?私が「ストリーム」を持っていれば、カフカはレコードを送っていますので、私は常にポーリングをする必要はありません。最新のAPIでそれを行う方法はありますか、代わりにあなたの方法を使用する理由はありますか? – gyoho

    関連する問題