kafka client 2.11: 0.10.2.1を使用する小さなJavaスパークサービスがあります。続き新しいカフカクライアントを使用して古いカフカによって公開されたトピックのパーティション情報を取得するときにタイムアウトします
は、私は、最新のカフカのバージョンから出版され話題を読んだときに正常に動作コードです:
Properties props = new Properties();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.getBrokerConnectionString());
props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, "all");
props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, producerConfig.getRetry());
props.put(org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.getBatchSize());
props.put(org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG, producerConfig.getLingerTimeInMs());
props.put(org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, producerConfig.getRequestTimeout());
props.put(org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG, producerConfig.getMaxBlockMS());
props.put(org.apache.kafka.clients.producer.ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, producerConfig.getMaxIdleTime());
props.put(org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG, maxBytesInBuffer/producerConfig.getProducersCount());
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producers = new Producer[1];
producers[0] = new KafkaProducer<>(props);
producers[0].partitionsFor("mYTopic").size();
カフカのバージョンが0.8.2.x ある既存のカフカトピックは、があります。私はこれと同じコードを使いたいと思っていました。しかし、このコードでは、最後の行(partitionsFor
)でタイムアウトが発生し、Kafkaのバージョン0.8.2.xのトピックが公開されています。この点に関する助けに感謝します。要するに
:一般のブローカーでは0.10.2.1クライアントによって
ドキュメントhttp://kafka.apache.org/documentation/#upgradeによると、0.10.2クライアントは0.8.2ブローカと通信できません。 –