2017-10-18 3 views
0

私はStormバージョン1.1.0とKafkaバージョン0.10.1.2を使用しています。Kafka spoutエラー「コンシューマがどのトピックにも登録されていないか、パーティションに割り当てられていません」

次のように私はカフカ-注ぎ口を作成しています:

public KafkaSpout<String, String> getKafkaSpout() { 
    String _kafkaBrokers = (String) props.get("bootstrap.servers"); 
    String _topic = (String) props.get("kafka.topic.name"); 
    String groupId = (String) props.get("group.id"); 
    int maxMsgSize = (int) props.get("fetch.message.max.bytes"); 
    String keySerializer = (String) props.get("key.serializer"); 
    String valueSerializer = (String) props.get("value.serializer"); 

    List<String>topics = new ArrayList<String>(`enter code here`); 
    topics.add(_topic); 

    return new KafkaSpout<String, String (KafkaSpoutConfig.builder(_kafkaBrokers, topics) 
      .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) 
      .setMaxUncommittedOffsets(100) 
      .setProp(ConsumerConfig.GROUP_ID_CONFIG, groupId) 
      .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxMsgSize) 
      .setProp("key.serializer",keySerializer) 
      .setProp("value.serializer",valueSerializer) 
      .build()) 
} 

を私は他の依存関係、私は以下のプロジェクトにMavenの依存関係を言及しているとともに、下記のエラー

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:973) 
at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:291) 
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:225) 
at org.apache.storm.daemon.executor$fn__9798$fn__9813$fn__9844.invoke(executor.clj:647) 
at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) 
at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 

を取得しています

<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka-client</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 

答えて

0

私はList<String>topics = new ArrayList<String>("enter code here");が問題だと思いますか?おそらくそのリストにあなたのトピック名を書く必要があります。

あなたの依存関係のバージョンは奇妙で、AFAIK Stormはこれらのバージョン文字列で何もリリースしていません。

カフカ> 0.10クラスターのストームカフカクライアントと古いカフカクラスターのストームカフカ(なぜなら最新のカフカとはまだ矛盾していません。思う)。

関連する問題