0

私は、あるトピックに加入しているkafkaspoutsのグループを作成する方法について、しばらくの間、多くのことを試してきました。私はspoutconfigのidを引用しているいくつかのソースがグループidとして使用できることを発見しました。私の主な問題は、スパウトがグループとして行動したかどうかをどのように知っているかです。私はまた、setSpout()のparalellism_hintがスパラスのparalellism_hint数のグループを作成するものかどうか疑問に思います。私を教えてください。コンシューマーグループIDを持つKafkaSpoutを作成する

私のコードは、今2つの注ぎ口は私がグループとして(単語数-注ぎ口、total_word_count-注ぎ口)行為を作成した場合どのように私は知っています。この

private KafkaSpout buildKafkaSpout(String zkTopic, String zkRoot, String groupId) { 

     String zkConnString = "localhost:2181"; 
     BrokerHosts hosts = new ZkHosts(zkConnString); 
     SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, zkTopic,zkRoot, groupId); 
     kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     KafkaSpout kafkaSpout = new KafkaSpout(kafkaSpoutConfig); 
     return kafkaSpout; 
    } 

private void buildTopologyForGroupOne(TopologyBuilder builder) { 

     String zkTopic = "topic1"; 
     String zkRoot = "/topic1"; 
     String groupId = "group1"; 
     List<String> zkSpoutIds = new ArrayList<String>(); 
     zkSpoutIds.add("word_count-spout"); 
     zkSpoutIds.add("total_word_count-spout"); 

     for(String spoutId:zkSpoutIds){ 
      KafkaSpout kafkaSpout = buildKafkaSpout(zkTopic, zkRoot, groupId); 
      builder.setSpout(spoutId.concat("_"+groupId), kafkaSpout,2); 
     } 
     builder.setBolt("word_split-bolt_group_1",new SplitBolt()).shuffleGrouping("word_count-spout"+"_"+groupId); 
     builder.setBolt("split_count-bolt_group_1",new CountBolt()).shuffleGrouping("word_split-bolt_group_1"); 
     builder.setBolt("total_word_count-bolt_group_1",new TotalWordCountBolt()).shuffleGrouping("total_word_count-spout"+"_"+groupId); 

    } 

のようなものです。グループとして活動することによって、私は、新しい注ぎ口が作成された場合、飼い主がパーティションを再配置することを意味します。事前に

おかげ

+0

最後にソースが見つかりました。コンシューマIDを持つKafkaSpoutは、新しいバージョンのstorm apache-storm-1.0.2を使用して作成できます。 – Sushmitha

答えて

0

嵐は、カフカが代わりに高レベルの消費者向けAPIのSimpleConsumerをemployes、それは、高可用性およびフェイルオーバーを実装するには、この設定を使用して、高レベルの消費者のためにあるので、group.idは重要ではありません。

私は、2つのコンシューマインスタンスが同じzkパスロックのために競合するため、ランタイムでコードに間違ったことが起こると思います。

+0

私の意図は、kafkaSpoutのグループIDを使用しているということです。 – Sushmitha

+0

パフォーマンスを考慮すると、ストームの観点からワーカー番号/エグゼキュータ番号/タスク番号を設定して、より良いスループットを実現できます。 group.idを設定しても、KafkaSpoutには影響しません。それは、あなたが考えるグループIDではなく、オフセットを格納する場所をZKに伝えるクライアントIDでなければなりません。 – amethystic

+0

解説ありがとうございます。私の目的は、1つのスパウトアウトプットを複数のボルトで消費できるようにすることです。そのような実装での問題は、1つのボルトがタプルを確認できなかった場合、タプルはお互いに再生されます吐き出し口でもボルトで止めます。私はこれを避けたいと思います。私に方法を提案することができれば非常に感謝します。解決策を見つけるためにストリーム名を使用しようとしています。 – Sushmitha

関連する問題