2017-06-27 3 views
0

kafka client 2.11: 0.10.2.1を使用する小さなJavaスパークサービスがあります。続き新しいカフカクライアントを使用して古いカフカによって公開されたトピックのパーティション情報を取得するときにタイムアウトします

は、私は、最新のカフカのバージョンから出版され話題を読んだときに正常に動作コードです:

Properties props = new Properties(); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.getBrokerConnectionString()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, "all"); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, producerConfig.getRetry()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.getBatchSize()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG, producerConfig.getLingerTimeInMs()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, producerConfig.getRequestTimeout()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG, producerConfig.getMaxBlockMS()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, producerConfig.getMaxIdleTime()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG, maxBytesInBuffer/producerConfig.getProducersCount()); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    producers = new Producer[1]; 
    producers[0] = new KafkaProducer<>(props); 
    producers[0].partitionsFor("mYTopic").size(); 

カフカのバージョンが0.8.2.x ある既存のカフカトピックは、があります。私はこれと同じコードを使いたいと思っていました。しかし、このコードでは、最後の行(partitionsFor)でタイムアウトが発生し、Kafkaのバージョン0.8.2.xのトピックが公開されています。この点に関する助けに感謝します。要するに

:一般のブローカーでは0.10.2.1クライアントによって

+1

ドキュメントhttp://kafka.apache.org/documentation/#upgradeによると、0.10.2クライアントは0.8.2ブローカと通信できません。 –

答えて

0

を読み取ることができません(0.8.2.x刊)カフカのトピックはクライアントに関して下位互換性がありますが、クライアントがに関しては後方互換性がありませんブローカー。これは、最新のリリースで少し緩和されていますが、0.10より古いブローカーにはまだ保持されています。バージョン0.10.2以降で

http://kafka.apache.org/documentation.html#upgrade

、Javaクライアント(生産者と消費者は)古いブローカーと通信する能力を獲得しています。バージョン0.11.0のクライアントは、バージョン0.10.0以降のブローカーと通信できます。ただし、ブローカーが0.10.0より古い場合は、クライアントをアップグレードする前にKafkaクラスター内のすべてのブローカーをアップグレードする必要があります。バージョン0.11.0のブローカーは、0.8.x以降のクライアントをサポートしています。

関連する問題