コンフルエント3.0.1プラットフォームを使用し、Kafka-Elasticsearchコネクタを構築しています。このために私はKafkaからデータを取得するためにSinkConnectorとSinkTask(Kafka-connect APIs)を拡張しています。コンフルエントプラットフォームのKafka-Connect APIでmax.poll.recordsを設定する方法
このコードの一部として、一度に100個のレコードしか取得できないように、「max.poll.records」を返すようにSinkConnectorのtaskConfigsメソッドを拡張しています。しかし、それは動作していないと私はすべてのレコードを同時に取得しており、私は規定された時間内にオフセットをコミットしていません。いずれかは、私が「max.poll.records」
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<String, String>();
config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
config.put(ConfigurationConstants.HOSTS, hosts);
config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
config.put(ConfigurationConstants.IDS, elasticSearchIds);
config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
config.put("max.poll.records", "100");
configs.add(config);
}
return configs;
}
あなたのニーズを満たす場合、Confluent 3.1(本日リリース)にはElasticsearchシンクコネクタが含まれています。 http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar