私はまだカフカZOOKEPER_AUTO_RESETと疑問を抱いています。同じ質問が重複した質問であれば大丈夫です。kafka zookeperの自動リセットを理解する
私は消費を続けている高水準のJavaコンシューマーを抱えています。 私は複数のトピックを持ち、すべてのトピックには単一のパーティションがあります。
私の懸念事項は以下の通りです。
消費者グループ名が「ncdev1」、ZOOKEPER_AUTO_RESET = smallest
のconsumerkafka.jarを開始しました。 initオフセットが-1に設定されていることがわかりました。その後、私はいつか瓶を停止/開始しました。この時点で、消費者グループ(ncdev1)36に割り当てられた最新のオフセットが選択されます。もう一度やり直してから、initoffsetを39に設定します。これは最新の値です。
次に、グループ名をZOOKEPER_GROUP_ID = ncdev2
に変更しました。 jarファイルを再起動しましたが、今回もオフセットは-1に設定されています。さらに再起動するには、それは私が次に
ZOOKEPER_AUTO_RESET=largest
とZOOKEPER_GROUP_ID = ncdev3
を設定するグループ名ncdev3とjarファイルを再起動しようとした後39
すなわち、最新の値に跳ね上がりました。再起動時にオフセットが選択される方法には違いはありません。つまり、再起動すると39が選択されます。これは以前の設定と同じです。
beginning.Any他の構成を形成するオフセットなぜそれが、ピッキングされていない上の任意のアイデアは、それが最初から読むか?(What determines Kafka consumer offset?から最大と最小の理解)事前に
おかげ作るために行われる
コードが追加されました
public class ConsumerForKafka {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
ServerSocket soketToWrite;
Socket s_Accept ;
OutputStream s1out ;
DataOutputStream dos;
static boolean logEnabled ;
static File fileName;
private static final Logger logger = Logger.getLogger(ConsumerForKafka.class);
public ConsumerForKafka(String a_zookeeper, String a_groupId, String a_topic,String session_timeout,String auto_reset,String a_commitEnable) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId,session_timeout,auto_reset,a_commitEnable));
this.topic =a_topic;
}
public void run(int a_numThreads,String a_zookeeper, String a_topic) throws InterruptedException, IOException {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST");
int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT"));
Socket socks = new Socket(socketURL,socketPort);
//****
String keeper = a_zookeeper;
String topic = a_topic;
long millis = new java.util.Date().getTime();
//****
PrintWriter outWriter = new PrintWriter(socks.getOutputStream(), true);
List<KafkaStream<byte[], byte[]>> streams = null;
// now create an object to consume the messages
//
int threadNumber = 0;
// System.out.println("going to forTopic value is "+topic);
boolean keepRunningThread =false;
boolean chcek = false;
logger.info("logged");
BufferedWriter bw = null;
FileWriter fw = null;
if(logEnabled){
fw = new FileWriter(fileName, true);
bw = new BufferedWriter(fw);
}
for (;;) {
streams = consumerMap.get(topic);
keepRunningThread =true;
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(keepRunningThread)
{
try{
if (it.hasNext()){
if(logEnabled){
String data = new String(it.next().message())+""+"\n";
bw.write(data);
bw.flush();
outWriter.print(data);
outWriter.flush();
consumer.commitOffsets();
logger.info("Explicit commit ......");
}else{
outWriter.print(new String(it.next().message())+""+"\n");
outWriter.flush();
}
}
// logger.info("running");
} catch(ConsumerTimeoutException ex) {
keepRunningThread =false;
break;
}catch(NullPointerException npe){
keepRunningThread =true;
npe.printStackTrace();
}catch(IllegalStateException ile){
keepRunningThread =true;
ile.printStackTrace();
}
}
}
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId,String session_timeout,String auto_reset,String commitEnable) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", session_timeout);
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.offset.reset", auto_reset);
props.put("auto.commit.interval.ms", "60000");
props.put("consumer.timeout.ms", "30");
props.put("auto.commit.enable",commitEnable);
//props.put("rebalance.max.retries", "4");
return new ConsumerConfig(props);
}
public static void main(String[] args) throws InterruptedException {
String zooKeeper = PropertyUtils.getProperty("ZOOKEEPER_URL_PORT");
String groupId = PropertyUtils.getProperty("ZOOKEPER_GROUP_ID");
String session_timeout = PropertyUtils.getProperty("ZOOKEPER_SESSION_TIMOUT_MS"); //6400
String auto_reset = PropertyUtils.getProperty("ZOOKEPER_AUTO_RESET"); //smallest
String enableLogging = PropertyUtils.getProperty("ENABLE_LOG");
String directoryPath = PropertyUtils.getProperty("LOG_DIRECTORY");
String log4jpath = PropertyUtils.getProperty("LOG_DIR");
String commitEnable = PropertyUtils.getProperty("ZOOKEPER_COMMIT"); //false
PropertyConfigurator.configure(log4jpath);
String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST");
int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT"));
try {
Socket socks = new Socket(socketURL,socketPort);
boolean connected = socks.isConnected() && !socks.isClosed();
if(connected){
//System.out.println("Able to connect ");
}else{
logger.info("Not able to conenct to socket ..Exiting...");
System.exit(0);
}
} catch (UnknownHostException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch(java.net.ConnectException cne){
logger.info("Not able to conenct to socket ..Exitring...");
System.exit(0);
}
catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// String zooKeeper = args[0];
// String groupId = args[1];
String topic = args[0];
int threads = 1;
logEnabled = Boolean.parseBoolean(enableLogging);
if(logEnabled)
createDirectory(topic,directoryPath);
ConsumerForKafka example = new ConsumerForKafka(zooKeeper, groupId, topic, session_timeout,auto_reset,commitEnable);
try {
example.run(threads,zooKeeper,topic);
} catch(java.net.ConnectException cne){
cne.printStackTrace();
System.exit(0);
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static void createDirectory(String topic,String d_Path) {
try{
File file = new File(d_Path);
if (!file.exists()) {
if (file.mkdir()) {
logger.info("Directory Created" +file.getPath());
} else {
logger.info("Directory Creation failed");
}
}
fileName = new File(d_Path + topic + ".log");
if (!fileName.exists()) {
fileName.createNewFile();
}
}catch(IOException IOE){
//logger.info("IOException occured during Directory or During File creation ");
}
}
}
'ZOOKEPER_AUTO_RESET'という名前の設定はありません。 'auto.offset.reset'を意味しますか? – amethystic
はいそれは 'auto.offset.reset'です。間違った設定名を共有するための知識。 –
'auto.offset.reset'を一番早く設定しても、消費者が常に最新のオフセットからデータを読み出すことが分かっている場合は、コードを貼り付けてください。 – amethystic