2017-02-17 3 views
1

私はカフカ0.10.1.1とストーム1.0.2を使用しています。 kafkaインテグレーションの嵐の文書では、私たちがzookeeperサーバーを使ってkafka spoutを初期化しているので、オフセットはまだzookeeperを使って維持されていることがわかります。 kafkaサーバーを使ってスパウトをブートストラップする方法はありますか?飼育係を使用した嵐のドキュメントカフカ・スパウト・インテグレーション

BrokerHosts hosts = new ZkHosts(zkConnString); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

このオプションから 例が正常に動作しているとのメッセージを消費しています。しかし私はkafkamanager uiの消費者グループまたはストームノードを消費者として見ることができませんでした。

代替アプローチがこれです。

KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig(); 

KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig); 

private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() { 

     Map<String, Object> props = new HashMap<>(); 
     props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers); 
     props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID); 
     props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); 

     String[] topics = new String[1]; 
     topics[0] = topicName; 

     KafkaSpoutStreams kafkaSpoutStreams = 
       new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build(); 

     KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = 
       new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build(); 

     KafkaSpoutConfig<String, String> spoutConf = 
       new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build(); 

     return spoutConf; 
    } 

しかし、この解決策は、kafkaから少数のメッセージを読み込んだ後にCommitFailedExceptionを表示しています。

答えて

0

ストーム・カフカは、一般的なカフカ・クライアントを使用して、動物園で異なる場所と異なるフォーマットで消費者情報を書き出します。だからあなたはkafkamanager uiでそれを見ることはできません。

https://github.com/keenlabs/capillaryのような他のモニタツールがあります。

関連する問題