2015-11-19 11 views

答えて

9

Kafka 0.9.0のドキュメントによれば、新しい消費者を0.8.xブローカーからのデータの読み取りに使用することはできません。理由は次のとおりです。

0.9.0.0は、以前のバージョンとのブローカ間プロトコルの変更点があります。

1

これは *消費者のAPIにいくつかの重要な変更を伴うだろうので、私たちは私たちのコミュニティからの提案についてのフィードバックを収集したいと思います。変更のリストは少ないので、一部の機能が他の機能よりも優先されているかどうか、さらに重要なことに、一部の機能がまったく必要ない場合は理解したいと考えています。

*強調の鉱山。

互換性のない場所は特に見つかりませんでした。しかし、その引用と、0.8のプロデューサが0.7のプロデューサと互換性がないという事実を使用して、私は互換性がないと仮定しています。

5

一般的に、ブローカーは下位互換性を目的としているため、クライアントよりも前にブローカーをアップグレードすることをお勧めします。 0.9ブローカーは0.8消費者APIと0.9消費者APIの両方で動作しますが、それ以外の方法では動作しません。

0

カフカ0.9.0には、後方互換性があります。ドキュメント

0.9.0.0からhttp://kafka.apache.org/documentation.html#upgrade

引用をチェックし潜在的な破壊の変更(アップグレードする前に確認してください)と、以前のバージョンから間ブローカーのプロトコルの変更があります。ローリング・アップグレードの場合:およびすべてのブローカ上

  • 更新server.propertiesファイル次 プロパティを追加します。inter.broker.protocol.version =
  • 0.8.2.Xは、ブローカーをアップグレードします。 これはブローカを一度にダウンロードするだけで、 コードを更新し、ブローカを再起動することで実行できます。
  • クラスタ全体が にアップグレードされたら、 inter.broker.protocol.versionを編集して0.9.0.0に設定してプロトコルバージョンをバンプします。
  • 再起動 ブローカー私は最近、私のアプリケーションでは、私はカフカ0.9から読み込まれた後、バックカフカに書かなければならなかった同様の問題に直面した効果
1

を取るために新しいプロトコルバージョンのための一つ一つ0.8である。私はカフカクライアント0.9を次のように使いました。

コンシューマー・コンフィグ

props.put("bootstrap.servers", "brokers_ip as comma seperated values"); 
    props.put("group.id", "your group id"); 
    props.put("key.deserializer", StringDeserializer.class.getName()); 
    props.put("value.deserializer", StringDeserializer.class.getName()); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", 1000); 
    props.put("session.timeout.ms", 30000); 
    consumer = new KafkaConsumer<String, String>(props); 
    consumer.subscribe("List of topics to subscribe too"); 

プロデューサーコンフィグ

 Properties props = new Properties(); 
     props.put("bootstrap.servers","list of broker ips"); 
     props.put("metadata.broker.list", "list of broker ips"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> producer = new Producer<String, String>(config); 
     String message = "hello world"; 
     KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic_name, message); 
     producer.send(data); 
     producer.close(); 

この情報がお役に立てば幸いです。

+0

あなたは 'producer.flush()'を試しましたか?これはブロッキングコールであり、0.9プロデューサAPIと0.8.1.1ブローカを使用しようとすると、永遠に詰まってしまいます。しかし、この場合、 'KeyedMessage'は' ProducerRecord'に置き換えられます。 – Confused

+0

いいえ、私はそれを試していませんでした。 – abhinav

+0

問題ありません。私は公式メーリングリストで確認しました。バージョン0.8.1.1と0.10.0.0は互換性がありません。 「生産者」、「カフカクラスター」、「消費者」もアップグレードしなければならなかった。 – Confused

関連する問題