2017-08-09 2 views
0

私はカフカフリンクで概念証明をしていましたが、次のことを発見しました:フリフサイドでの作業負荷のためにカフカプロデューサーのエラーが発生する可能性があります。ここでkafka-flink:フリンクジョブに依存するカフカプロデューサーのエラー

されている詳細:

私は「実体」、「値」、「タイムスタンプ」私が使用

のような値で700'000行〜で作られたEDR ??サンプルなどのサンプルファイルがあります。カフカトピックを作成するには、次のコマンドを実行します。

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic gprs 

私はトピックのサンプルファイルをロードするには、次のコマンドを使用します。

[13:00] [email protected]: ~/fms 
% /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic gprs < ~/sample/sample01.EDR 

6時間72時間のスライディングウインドウ(aggregationeachsix、aggregationeachsentytwo)で、各エンティティの値を集計するフリンクサイドジョブがあります。

私は3つのシナリオでした:aggregationeachsixジョブはaggregationeachsixとランニングaggregationeachsentytwoジョブで話題に

  • ロードファイルを実行していると話題に
  • ロードファイルを実行しているすべての仕事をせずに話題に

    1. ロードファイルを

    結果は、最初の2つのシナリオが機能していますが、3番目のシナリオでは、私はカフカプロデューサー側で次のエラーがありますファイルをロードしている間(同一ファイルではありません常に、それが第一、第二、第三、あるいはそれ以降のファイルをすることができます):FLINKはその後、カフカ生産に影響を与えると可能性があり、なぜ私の質問がある

    [plenty of lines before this part] 
        [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
        org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
        [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
        org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
        [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
        org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
        [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1627 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
        [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1626 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
        [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1625 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
        [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1624 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
        [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
        org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time 
        [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
        org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time 
        [plenty of lines after this part] 
    

    、何このエラーを避けるために変更する必要がありますか?

  • 答えて

    0

    flinkとkafka-producerの両方がそれを使用しているときにネットワークが飽和しているように見えるので、TimeoutExceptionsが得られます。

    +0

    すべてが同じサーバー(kafkaとflink)で実行されています... –

    +0

    次に、単一のマシン上で実行されているカフカの限界にぶつかっている可能性があります。 –