2016-07-08 9 views
5

私は単一のノード、マルチ(3)ブローカーZookeeper/Kafkaセットアップを持っています。私はKafka 0.10 Javaクライアントを使用しています。Kafka 0.10 JavaクライアントTimeoutException:1レコードが含まれているバッチが期限切れになった

私は、単純な(カフカとは異なるサーバー上の)リモートプロデューサー(コードでは、私はMYIPと私のパブリックIPアドレスを置き換え)以下に書いた:3つのブローカーのための私のサーバーのプロパティは、(次のようになり

Properties config = new Properties(); 
try { 
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094"); 
    config.put(ProducerConfig.ACKS_CONFIG, "all"); 
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    producer = new KafkaProducer<String, byte[]>(config); 
    Schema.Parser parser = new Schema.Parser(); 
    schema = parser.parse(GATEWAY_SCHEMA); 
    recordInjection = GenericAvroCodecs.toBinary(schema); 
    GenericData.Record avroRecord = new GenericData.Record(schema); 
    //Filling in avroRecord (code not here) 
    byte[] bytes = recordInjection.apply(avroRecord); 

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes)); 
    RecordMetadata data = future.get(); 
} catch (Exception e) { 
    e.printStackTrace(); 
} 

3つの異なるサーバープロパティファイルbroker.idは0,1,2であり、リスナーはPLAINTEXT://:9092、PLAINTEXT://:9093、PLAINTEXT://:9094、host.nameは10.2.0.4,10.2です。 0.5,10.2.0.6)。 これは、最初のサーバーのプロパティファイルされる:私は、コードを実行すると

broker.id=0 
listeners=PLAINTEXT://:9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/tmp/kafka1-logs 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 

は、私は次の例外を取得:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65) 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52) 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) 
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212) 
    at com.nr.roles.gateway.gw.service(gw.java:126) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) 
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821) 
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) 
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158) 
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) 
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090) 
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) 
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109) 
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119) 
    at org.eclipse.jetty.server.Server.handle(Server.java:517) 
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308) 
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242) 
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261) 
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) 
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75) 
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213) 
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 

誰もが私が行方不明です何を知っていますか?どんな助けもありがとう。ありがとうございます

+0

私も上記と同じようにしようとしましたが、1つのブローカー(ポート9092)のみでした。私はまだまったく同じ例外を取得します。リモートマシンのブローカーと飼い猫のポートが開いていることを確認し、プロデューサーマシンからtelnetで接続できるようにしました。 – Armen

答えて

1

BOOTSTRAP_SERVERS_CONFIGのポート情報が正しくありません(MYIP:)。

server.propertiesで「PLAINTEXT://:9093、PLAINTEXT://:9093、PLAINTEXT://:9094」と記述したとおりです。

+0

申し訳ありませんが、ここでタイプミスがありました。server.propertiesには、 "PLAINTEXT://:9092、PLAINTEXT://:9093、PLAINTEXT://:9094"です。したがって、 'BOOTSTRAP_SERVERS_CONFIG'ポートは正しいです。 – Armen

3

同じ問題が発生します。

ipアドレスを指定するには、kafka server.propertiesを変更する必要があります。 例:

PLAINTEXT:// YOUIP:9093

ない場合は、カフカプロデューサーがホストを得ることができないならば、それはあなたがそれらをtelnetで接続することができた場合でも、カフカするメッセージを送信することはできません、ホスト名を使用します。

+0

私のプロパティはserver.propertiesに正しく設定されていますが、リスナー= PLAINTEXT:// domain_name:9092ですが、まだこの例外が発生しています。org.apache.kafka.common.errors.TimeoutException:Batch Expired java.util.concurrent。 ExecutionExceptionは、外部サーバーから接続している間、私はさらに高い値にrequest.timeout.ms \tを増やしました。 –

+0

はOPの質問に明らかに答えません... – Paul

0

This答え共有いくつかの洞察力。プロビジョニング設定のrequest.timeout.msを増やすと、クライアントは期限切れになる前にバッチをより長くキューに入れることができます。

また、batch.sizelinger.msの設定を調べて、最適なケースを見つけることができます。

関連する問題