2017-12-07 2 views
0

getMessages()以下のメソッド時にははカフカトピックのすべてのメッセージを取得します。このコードは、ページの読み込み時にWebアプリケーションで実行されます。メッセージが戻ってこない場合もあり、すべてのメッセージが戻ってくることもあります。KafkaConsumer経由ですべてのカフカトピックメッセージを確実に取得する方法

プロパティを設定したり、コードを変更してすべてのメッセージが毎回戻るようにする方法はありますか?

public List<String> getMessages() { 
    List<String> messages = new ArrayList<>(); 
    try { 
     ConnectionKafka connection = ConstantsHome.connectionManager.getConnectionDef(getGuid(), ConnectionKafka.class); 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", connection.getProps().get("bootstrapServers")); 
     props.put("group.id", getName()); 
     props.put("auto.offset.reset", "earliest"); 
     props.put("enable.auto.commit", "true"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
     consumer.subscribe(Collections.singleton(getName())); 
     consumer.poll(0); 
     consumer.seekToBeginning(consumer.assignment()); 
     ConsumerRecords<String, String> records = consumer.poll(0); 
     for (ConsumerRecord<String, String> record : records) { 
      messages.add(
       String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()) 
      ); 
     } 
     consumer.close(0, TimeUnit.MILLISECONDS); 
    } catch (Exception e) { 
     Utils.writeToLog(e, getClass().getName(), "", IErrorManager.ERROR); 
    } 
    Collections.sort(messages, new Comparator<String>() { 
     @Override 
     public int compare(String o1, String o2) { 
      return Integer.valueOf(o1.substring("offset = ".length(), o1.indexOf(","))) - 
      Integer.valueOf(o2.substring("offset = ".length(), o2.indexOf(","))); 
     } 
    }); 
    return messages; 
} 

答えて

1

あなたの期待は、各コールのすべてのメッセージを取得している場合は、次のように設定する必要があり、他のオプションは、反復ごとに動的グループIDを作成しているpropertly

enable.auto.commit = false 

、私はこれを避けるだろうグループメタデータがカフカ側に格納されていることを考慮して選択します。

+0

ok - "enable.auto.commit"、 "false"を試します。 –

+0

は動作しているようです - ありがとうございます。 kafkaのJava APIが薄れていることが証明されていますが、おそらくこれがトリックです –

関連する問題