私はカフカ0.10.1.1とストーム1.0.2を使用しています。 kafkaインテグレーションの嵐の文書では、私たちがzookeeperサーバーを使ってkafka spoutを初期化しているので、オフセットはまだzookeeperを使って維持されていることがわかります。 kafkaサーバーを使ってスパウトをブートストラップする方法はありますか?飼育係を使用した嵐のドキュメントカフカ・スパウト・インテグレーション
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
このオプションから 例が正常に動作しているとのメッセージを消費しています。しかし私はkafkamanager uiの消費者グループまたはストームノードを消費者として見ることができませんでした。
代替アプローチがこれです。
KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();
KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);
private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers);
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
String[] topics = new String[1];
topics[0] = topicName;
KafkaSpoutStreams kafkaSpoutStreams =
new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build();
KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build();
KafkaSpoutConfig<String, String> spoutConf =
new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build();
return spoutConf;
}
しかし、この解決策は、kafkaから少数のメッセージを読み込んだ後にCommitFailedExceptionを表示しています。