2017-05-19 15 views
1

私はカフカとストームに問題があります。私が設定しているKafkaSpoutの設定に問題がある場合、または私が正しく肯定応答していないかどうか、この時点ではわかりません。なぜApache Storm KafkaSpoutはカフカのトピックから非常に多くのアイテムを放出していますか?

私は50個のアイテムを私のカフカトピックにキューイングしましたが、私のスパウトは1300個以上のタプルを放出しました。また、Spoutは、ほとんどすべてが「失敗」していると報告しています。トポロジは、実際にそれが成功したデータベースへの書き込みだ、失敗されていないが、それは明らかに、すべてを再生している理由(それはそれでやっているものだならば)私はちょうど

大きな問題があるので、あまり知らない:

なぜ私は50をカフカに渡したときに非常に多くのタプルを放出していますか?ここで

enter image description here

私はトポロジーとKafkaSpout

public static void main(String[] args) { 
    try { 
     String databaseServerIP = ""; 
     String kafkaZookeepers = ""; 
     String kafkaTopicName = ""; 
     int numWorkers = 1; 
     int numAckers = 1; 
     int numSpouts = 1; 
     int numBolts = 1; 
     int messageTimeOut = 10; 
     String topologyName = ""; 

     if (args == null || args[0].isEmpty()) { 
     System.out.println("Args cannot be null or empty. Exiting"); 
     return; 
     } else { 
     if (args.length == 8) { 
      for (String arg : args) { 
      if (arg == null) { 
       System.out.println("Parameters cannot be null. Exiting"); 
       return; 
      } 
      } 
      databaseServerIP = args[0]; 
      kafkaZookeepers = args[1]; 
      kafkaTopicName = args[2]; 
      numWorkers = Integer.valueOf(args[3]); 
      numAckers = Integer.valueOf(args[4]); 
      numSpouts = Integer.valueOf(args[5]); 
      numBolts = Integer.valueOf(args[6]); 
      topologyName = args[7]; 
     } else { 
      System.out.println("Bad parameters: found " + args.length + ", required = 8"); 
      return; 
     } 
     } 

     Config conf = new Config(); 

     conf.setNumWorkers(numWorkers); 
     conf.setNumAckers(numAckers); 
     conf.setMessageTimeoutSecs(messageTimeOut); 

     conf.put("databaseServerIP", databaseServerIP); 
     conf.put("kafkaZookeepers", kafkaZookeepers); 
     conf.put("kafkaTopicName", kafkaTopicName); 

     /** 
     * Now would put kafkaSpout instance below instead of TemplateSpout() 
     */ 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts); 
     builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout"); 


     StormTopology topology = builder.createTopology(); 

     StormSubmitter.submitTopology(topologyName, conf, topology); 

    } catch (Exception e) { 
     System.out.println("There was a problem starting the topology. Check parameters."); 
     e.printStackTrace(); 
    } 
    } 

    private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception { 

    //String topic = "FLAT-ITEMS"; 
    String zkNode = "/" + topic + "-subscriber-pipeline"; 
    String zkSpoutId = topic + "subscriberpipeline"; 
    KafkaTopicInZkCreator.createTopic(topic, zkHosts); 


    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId); 
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 

    // spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; 
    //spoutConfig.startOffsetTime = System.currentTimeMillis(); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

    return new KafkaSpout(spoutConfig); 

    } 

を設定していますし、ここにあなたがする必要がある

public static void createTopic(String topicName, String zookeeperHosts) throws Exception { 
    ZkClient zkClient = null; 
    ZkUtils zkUtils = null; 
    try { 

     int sessionTimeOutInMs = 15 * 1000; // 15 secs 
     int connectionTimeOutInMs = 10 * 1000; // 10 secs 

     zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
     zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

     int noOfPartitions = 1; 
     int noOfReplication = 1; 
     Properties topicConfiguration = new Properties(); 

     boolean topicExists = AdminUtils.topicExists(zkUtils, topicName); 
     if (!topicExists) { 
     AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$); 
     } 
    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } finally { 
     if (zkClient != null) { 
     zkClient.close(); 
     } 
    } 
    } 

答えて

1

を大事場合のトピックの作成がどのようにありますボルトのメッセージが失敗したかどうかを確認してください。

もしそれらがすべて失敗しても、あなたはおそらくボルトでメッセージを受け取っていないか、ボルトコードに例外があります。

ボルトメッセージが承認された場合、タイムアウトが発生する可能性が高くなります。トポロジータイムアウトの設定を大きくするか、パラライジムで問題を解決する必要があります。

+0

ありがとうございました。ボルトを掴む正しい方法は何ですか?どのようにトポロジタイムアウトを増やすのですか? – markg

+0

@markg BaseBasicBoltを使用している場合、ackを処理する必要はありません。 BaseRichBoltを使用して、executeメソッドでack()を呼び出す必要があります。 – Solo

+0

@markgトポロジタイムアウトは「topology.message.timeout」設定です。トポロジコードまたはスーパーバイザのstorm.yamlに設定できます – Solo

関連する問題