3

カフカのドキュメント以下、約アプローチを与えるには説明:スレッドごとkafka 0.9.0でマルチスレッドコンシューマを使用するには?

つのコンシューマ:簡単なオプションを各スレッドに独自の消費者>のインスタンスを与えることです。

マイコード:

public class KafkaConsumerRunner implements Runnable { 

    private final AtomicBoolean closed = new AtomicBoolean(false); 
    private final CloudKafkaConsumer consumer; 
    private final String topicName; 

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) { 
     this.consumer = consumer; 
     this.topicName = topicName; 
    } 

    @Override 
    public void run() { 
     try { 
      this.consumer.subscribe(topicName); 
      ConsumerRecords<String, String> records; 
      while (!closed.get()) { 
       synchronized (consumer) { 
        records = consumer.poll(100); 
       } 
       for (ConsumerRecord<String, String> tmp : records) { 
        System.out.println(tmp.value()); 
       } 
      } 
     } catch (WakeupException e) { 
      // Ignore exception if closing 
      System.out.println(e); 
      //if (!closed.get()) throw e; 
     } 
    } 

    // Shutdown hook which can be called from a separate thread 
    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

    public static void main(String[] args) { 
     CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder() 
       .withBootstrapServers("172.31.1.159:9092") 
       .withGroupId("test") 
       .build(); 
     ExecutorService executorService = Executors.newFixedThreadPool(5); 
     executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log")); 
     executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info")); 
     executorService.shutdown(); 
    } 
} 

はそれが仕事と例外をスローしません:

java.util.ConcurrentModificationExceptionが:KafkaConsumerは、マルチスレッドアクセス

のために安全ではありません

さらに、私はFlinkのソースを読んでいます。 dバッチデータ処理)。マルチスレッドの消費者を使用しているフリンクは私のものに似ています。

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT))); 
pollLoop: while (running) { 
    ConsumerRecords<byte[], byte[]> records; 
    //noinspection SynchronizeOnNonFinalField 
    synchronized (flinkKafkaConsumer.consumer) { 
     try { 
      records = flinkKafkaConsumer.consumer.poll(pollTimeout); 
     } catch (WakeupException we) { 
      if (running) { 
       throw we; 
      } 
      // leave loop 
      continue; 
     } 
    } 

flink code of mutli-thread

何が問題になっています ?

答えて

6

カフカ消費者はスレッドセーフでないです。あなたは、あなたの質問に指摘したように、文書は

は、単純なオプションは、独自の消費者インスタンス

しかし、あなたのコード内のスレッドそれぞれを与えることです、あなたは別のことで包まれた同じ消費者のインスタンスを持っていると述べましたKafkaConsumerRunnerインスタンス。したがって、複数のスレッドが同じコンシューマインスタンスにアクセスしています。カフカのドキュメントは明確に記載されています

カフカの消費者はスレッドセーフではありません。すべてのネットワークI/Oは、呼び出しを行っているアプリケーションのスレッドの で発生します。 マルチスレッドアクセスが正しく同期されていることをユーザーが確認するのは、 です。同期されないアクセスは、 ConcurrentModificationExceptionになります。

これはあなたが受け取った例外です。

2

ご登録の際に例外が発生しています。このような同期ブロックにブロックthis.consumer.subscribe(topicName);

移動:

@Override 
public void run() { 
    try { 
     synchronized (consumer) { 
      this.consumer.subscribe(topicName); 
     } 
     ConsumerRecords<String, String> records; 
     while (!closed.get()) { 
      synchronized (consumer) { 
       records = consumer.poll(100); 
      } 
      for (ConsumerRecord<String, String> tmp : records) { 
       System.out.println(tmp.value()); 
      } 
     } 
    } catch (WakeupException e) { 
     // Ignore exception if closing 
     System.out.println(e); 
     //if (!closed.get()) throw e; 
    } 
} 
+0

私のために働く。 – Prasath

2

はたぶんあなたのケースではありませんが、あなたはserveralのトピックのデータの処理をmerginしている場合は、あなたが複数のトピックからデータを読み取ることができます同じ消費者。そうでなければ、各トピックを消費する別々のジョブを作成することが望ましいです。

関連する問題