2016-06-29 11 views
0

私はカフカの高レベル消費者です。カフカの消費者状態を確認する方法

public class KafkaHighLevelConsumer implements Runnable { 
    private final KafkaConsumer<String, String> consumer; 
    private final List<String> topics; 
    private final int id; 

    public KafkaHighLevelConsumer(int id, 
         String groupId, 
         List<String> topics,BlockingQueue<String> storyQueue) { 
     this.id = id; 
     this.topics = topics; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9091"); 
     props.put("group.id", groupId); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     this.consumer = new KafkaConsumer<>(props); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(topics); 

      while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) { 
        Map<String, Object> data = new HashMap<>(); 
        data.put("partition", record.partition()); 
        data.put("offset", record.offset()); 
        data.put("value", record.value()); 
        System.out.println(this.id + ": " + data); 
       } 
      } 
     } catch (WakeupException e) { 
      // ignore for shutdown 
     }finally { 
      consumer.close(); 
     } 
    } 

    public void shutdown() { 
     consumer.wakeup(); 
    } 
} 

消費者は問題なく動作しますが、消費者の状態を監視する必要があります。 サーバのIPアドレスまたはポート番号が間違っていると、例外が発生します。

ポートを何か正しくないものに変更すると、props.put("bootstrap.servers", "localhost:9091");からprops.put("bootstrap.servers", "localhost:100500");に変わりますが、それでも例外はありません。

私はカフカに正常に接続しているかどうかを知りたいですか?そのようなケースを扱うことは可能ですか?

私は、このようなMavenを使うには、

<dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.9.0.1</version> 
     </dependency> 

感謝をDEPS!

+0

kafka-consumer-groups.shを使用してトピックのコンシューマとその状態を表示することができます – MGolovanov

+0

クライアントはブローカが停止している可能性が高いとみなすため、例外はありません。ブローカがオンラインになると、クライアントはブローカに接続します。 –

答えて

0

クライアントのマニュアル(https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)によると:

それは透過的に永続的な、ライブラリは一時的な障害の違いを見分けることができないカフカクラスタで

をサーバの障害を処理します失敗または誤った設定。したがって、すべての失敗が「再試行可能」であるとみなし、正確にこれを行い、永遠に再試行し、エラーを返すことはありません。

これは、接続の状態をチェックするための回避策です:、返すことが保証されたいくつかの情報を求めるいくつかの合理的な時間を待ち、何も返さないなら、あなたは、接続は、いくつかの問題があることを知っている:

int CONNECTION_TEST_TIMEOUT_SECONDS = 10; // or whatever is appropriate for your environment 

ExecutorService executor = Executors.newSingleThreadExecutor(); 
Runnable testTask = consumer::listTopics; 

Future future = executor.submit(testTask); 
try { 
    future.get(CONNECTION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); 
} catch (TimeoutException te) { 
    consumer.wakeup(); 
    throw new IOException("Could not communicate with the server within " + CONNECTION_TEST_TIMEOUT_SECONDS + " seconds"); 
} catch (InterruptedException e) { 
    // Nothing to do. Maybe a warning in the log? 
} catch (ExecutionException e) { 
    throw new IOException("Exception while running connection test: " + e.getMessage(), e); 
} 
関連する問題