2017-04-25 1 views
0

私はまだカフカZOOKEPER_AUTO_RESETと疑問を抱いています。同じ質問が重複した質問であれば大丈夫です。kafka zookeperの自動リセットを理解する

私は消費を続けている高水準のJavaコンシューマーを抱えています。 私は複数のトピックを持ち、すべてのトピックには単一のパーティションがあります。

私の懸念事項は以下の通りです。

消費者グループ名が「ncdev1」、ZOOKEPER_AUTO_RESET = smallestのconsumerkafka.jarを開始しました。 initオフセットが-1に設定されていることがわかりました。その後、私はいつか瓶を停止/開始しました。この時点で、消費者グループ(ncdev1)36に割り当てられた最新のオフセットが選択されます。もう一度やり直してから、initoffsetを39に設定します。これは最新の値です。

次に、グループ名をZOOKEPER_GROUP_ID = ncdev2に変更しました。 jarファイルを再起動しましたが、今回もオフセットは-1に設定されています。さらに再起動するには、それは私が次に
ZOOKEPER_AUTO_RESET=largestZOOKEPER_GROUP_ID = ncdev3

を設定するグループ名ncdev3とjarファイルを再起動しようとした後39

すなわち、最新の値に跳ね上がりました。再起動時にオフセットが選択される方法には違いはありません。つまり、再起動すると39が選択されます。これは以前の設定と同じです。

beginning.Any他の構成を形成するオフセットなぜそれが、ピッキングされていない上の任意のアイデアは、それが最初から読むか?(What determines Kafka consumer offset?から最大と最小の理解)事前に

おかげ作るために行われる

コードが追加されました

public class ConsumerForKafka { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 
    ServerSocket soketToWrite; 
    Socket s_Accept ; 
    OutputStream s1out ; 
    DataOutputStream dos; 
    static boolean logEnabled ; 
    static File fileName; 


    private static final Logger logger = Logger.getLogger(ConsumerForKafka.class); 


    public ConsumerForKafka(String a_zookeeper, String a_groupId, String a_topic,String session_timeout,String auto_reset,String a_commitEnable) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId,session_timeout,auto_reset,a_commitEnable)); 
     this.topic =a_topic; 
    } 


    public void run(int a_numThreads,String a_zookeeper, String a_topic) throws InterruptedException, IOException { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST"); 
     int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT")); 
     Socket socks = new Socket(socketURL,socketPort);   

     //**** 
     String keeper = a_zookeeper; 
     String topic = a_topic; 

     long millis = new java.util.Date().getTime(); 

     //**** 

     PrintWriter outWriter = new PrintWriter(socks.getOutputStream(), true); 

     List<KafkaStream<byte[], byte[]>> streams = null; 
     // now create an object to consume the messages 
     // 
     int threadNumber = 0; 
     // System.out.println("going to forTopic value is "+topic); 
     boolean keepRunningThread =false; 
     boolean chcek = false; 
     logger.info("logged"); 
     BufferedWriter bw = null; 
     FileWriter fw = null; 
     if(logEnabled){ 
      fw = new FileWriter(fileName, true); 
      bw = new BufferedWriter(fw); 
     } 

     for (;;) { 


      streams = consumerMap.get(topic); 
      keepRunningThread =true; 

      for (final KafkaStream stream : streams) { 

       ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

       while(keepRunningThread) 
       { 

       try{ 


        if (it.hasNext()){ 

         if(logEnabled){ 
          String data = new String(it.next().message())+""+"\n"; 
          bw.write(data); 
          bw.flush(); 
          outWriter.print(data); 
          outWriter.flush(); 
          consumer.commitOffsets(); 
          logger.info("Explicit commit ......"); 
         }else{ 

          outWriter.print(new String(it.next().message())+""+"\n"); 
          outWriter.flush(); 
         } 

        } 
        // logger.info("running"); 


       } catch(ConsumerTimeoutException ex) { 

        keepRunningThread =false; 
        break; 

        }catch(NullPointerException npe){ 

         keepRunningThread =true; 
         npe.printStackTrace(); 
        }catch(IllegalStateException ile){ 
         keepRunningThread =true; 
         ile.printStackTrace(); 
        } 

       } 

      } 

     } 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId,String session_timeout,String auto_reset,String commitEnable) { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", session_timeout); 
     props.put("zookeeper.sync.time.ms", "2000"); 
     props.put("auto.offset.reset", auto_reset); 
     props.put("auto.commit.interval.ms", "60000"); 
     props.put("consumer.timeout.ms", "30"); 
     props.put("auto.commit.enable",commitEnable); 
     //props.put("rebalance.max.retries", "4"); 


     return new ConsumerConfig(props); 
    } 

    public static void main(String[] args) throws InterruptedException { 

     String zooKeeper = PropertyUtils.getProperty("ZOOKEEPER_URL_PORT"); 
     String groupId = PropertyUtils.getProperty("ZOOKEPER_GROUP_ID"); 
     String session_timeout = PropertyUtils.getProperty("ZOOKEPER_SESSION_TIMOUT_MS"); //6400 
     String auto_reset = PropertyUtils.getProperty("ZOOKEPER_AUTO_RESET"); //smallest 
     String enableLogging = PropertyUtils.getProperty("ENABLE_LOG"); 
     String directoryPath = PropertyUtils.getProperty("LOG_DIRECTORY"); 
     String log4jpath = PropertyUtils.getProperty("LOG_DIR"); 
     String commitEnable = PropertyUtils.getProperty("ZOOKEPER_COMMIT"); //false 
     PropertyConfigurator.configure(log4jpath); 

     String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST"); 
     int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT")); 
     try { 
      Socket socks = new Socket(socketURL,socketPort); 
      boolean connected = socks.isConnected() && !socks.isClosed(); 
      if(connected){ 
       //System.out.println("Able to connect "); 
      }else{ 
       logger.info("Not able to conenct to socket ..Exiting..."); 
       System.exit(0); 
      } 
     } catch (UnknownHostException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } catch(java.net.ConnectException cne){ 
      logger.info("Not able to conenct to socket ..Exitring..."); 
      System.exit(0); 
     } 
     catch (IOException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 

     // String zooKeeper = args[0]; 
     // String groupId = args[1]; 
     String topic = args[0]; 
     int threads = 1; 

     logEnabled = Boolean.parseBoolean(enableLogging); 
     if(logEnabled) 
      createDirectory(topic,directoryPath); 

     ConsumerForKafka example = new ConsumerForKafka(zooKeeper, groupId, topic, session_timeout,auto_reset,commitEnable); 
     try { 
      example.run(threads,zooKeeper,topic); 
     } catch(java.net.ConnectException cne){ 
      cne.printStackTrace(); 
      System.exit(0); 
     } 
     catch (IOException e) { 
      // TODO Auto-generated catch block 

      e.printStackTrace(); 


     } 


    } 

    private static void createDirectory(String topic,String d_Path) { 

     try{ 
     File file = new File(d_Path); 
     if (!file.exists()) { 
      if (file.mkdir()) { 
       logger.info("Directory Created" +file.getPath()); 
      } else { 

       logger.info("Directory Creation failed"); 
      } 
     } 

     fileName = new File(d_Path + topic + ".log"); 
     if (!fileName.exists()) { 
      fileName.createNewFile(); 
     } 



     }catch(IOException IOE){ 
      //logger.info("IOException occured during Directory or During File creation "); 
     } 


    } 
} 
+0

'ZOOKEPER_AUTO_RESET'という名前の設定はありません。 'auto.offset.reset'を意味しますか? – amethystic

+0

はいそれは 'auto.offset.reset'です。間違った設定名を共有するための知識。 –

+0

'auto.offset.reset'を一番早く設定しても、消費者が常に最新のオフセットからデータを読み出すことが分かっている場合は、コードを貼り付けてください。 – amethystic

答えて

0

投稿を慎重に読み直した後、あなたが遭遇したことは、予想どおりになるはずです。

消費者グループ名を「ncdev1」、ZOOKEPER_AUTO_RESET =最小のconsumerkafka.jarを開始しました。 initオフセットが-1に設定されていることがわかりました。その後、私はいつか瓶を停止/開始しました。このとき、それは最初のオフセットやオフセットが範囲外の場合が存在しない場合、最新のオフセットコンシューマ・グループ(ncdev1)に割り当てられ、すなわち36

auto.offset.resetだけを適用選びます。ログには36個のメッセージしかないので、コンシューマ・グループはこれらのレコードをすべてすばやく読み取ることができます。そのため、コンシューマ・グループは再起動するたびに常に最新のオフセットを選択しています。

+0

です。ありがとうございます。commitOffsetsとは何ですか? config 'auto.commit.enable'がfalseに設定されていると明示的にそのコミットオフセットが表示されます。しかし、同じことが正確に働いているかどうかは疑いがありません。例えば私の消費者は36 msgを消費した。それを1つずつ反復しながら、21番目のメッセージを書き込もうとすると、私のTCP接続が閉じられたか、アプリケーションが終了しました。この場合、私のオフセット値はどうなりますか?20になります。 21日から? –

+0

'auto.commit.enable'がfalseに設定されていると、ユーザはオフセットコミットを処理します。どこでもオフセット、データベース、NoSQLストアなどを格納できます。しかし、ZookeeperやKafkaにオフセットを格納したい場合( 'offsets.storage' = kafkaを設定)、' commitOffsets'を手動で呼び出すと便利です。 – amethystic

+0

そのため、オフセットをコミットする必要があり、カフカは消費者の障害の場合に「少なくとも一度」のセマンティクスを約束します。コミットされたオフセットは、実際には「次のメッセージを読む」ことを意味します。 – amethystic

関連する問題