私は、あるトピックに加入しているkafkaspoutsのグループを作成する方法について、しばらくの間、多くのことを試してきました。私はspoutconfigのidを引用しているいくつかのソースがグループidとして使用できることを発見しました。私の主な問題は、スパウトがグループとして行動したかどうかをどのように知っているかです。私はまた、setSpout()
のparalellism_hintがスパラスのparalellism_hint数のグループを作成するものかどうか疑問に思います。私を教えてください。コンシューマーグループIDを持つKafkaSpoutを作成する
私のコードは、今2つの注ぎ口は私がグループとして(単語数-注ぎ口、total_word_count-注ぎ口)行為を作成した場合どのように私は知っています。この
private KafkaSpout buildKafkaSpout(String zkTopic, String zkRoot, String groupId) {
String zkConnString = "localhost:2181";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, zkTopic,zkRoot, groupId);
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaSpoutConfig);
return kafkaSpout;
}
private void buildTopologyForGroupOne(TopologyBuilder builder) {
String zkTopic = "topic1";
String zkRoot = "/topic1";
String groupId = "group1";
List<String> zkSpoutIds = new ArrayList<String>();
zkSpoutIds.add("word_count-spout");
zkSpoutIds.add("total_word_count-spout");
for(String spoutId:zkSpoutIds){
KafkaSpout kafkaSpout = buildKafkaSpout(zkTopic, zkRoot, groupId);
builder.setSpout(spoutId.concat("_"+groupId), kafkaSpout,2);
}
builder.setBolt("word_split-bolt_group_1",new SplitBolt()).shuffleGrouping("word_count-spout"+"_"+groupId);
builder.setBolt("split_count-bolt_group_1",new CountBolt()).shuffleGrouping("word_split-bolt_group_1");
builder.setBolt("total_word_count-bolt_group_1",new TotalWordCountBolt()).shuffleGrouping("total_word_count-spout"+"_"+groupId);
}
のようなものです。グループとして活動することによって、私は、新しい注ぎ口が作成された場合、飼い主がパーティションを再配置することを意味します。事前に
おかげ
最後にソースが見つかりました。コンシューマIDを持つKafkaSpoutは、新しいバージョンのstorm apache-storm-1.0.2を使用して作成できます。 – Sushmitha