2016-11-17 27 views
0

私はKafkaStreamsKafkaConnectで遊んでいます。単にトピックからのメッセージを消費しようとしています。私はこのトピックのために設定された "標準"バッチ消費者を持っており、それは魅力的なように機能します。私は最初にカフカにいくつかのレコードを送り、後にそれらを消費する。今私はKakfaストリームを使って同じことをしたいですが、私はその話題から一つのメッセージを得ることはできません。私が使用しているコンシューマーコードは次のとおりです。KafkaStreamはトピックからのメッセージを受信して​​いません

final int NUMBER_OF_PARTITIONS = 4; 
final Properties consumerConfig = new Properties(); 
consumerConfig.setProperty("zookeeper.connect", RULE.getConfiguration().kafka.getZookeeperUrl()); 
consumerConfig.setProperty("backoff.increment.ms", "100"); 
consumerConfig.setProperty("group.id", "java-consumer-example"); 
consumerConfig.setProperty("consumer.timeout.ms", "1000000"); 
consumerConfig.setProperty("client.id", "someclient"); 
consumerConfig.setProperty("auto.offset.reset", "smallest"); 
consumerConfig.setProperty("enable.auto.commit", "false"); 
consumerConfig.setProperty("bootstrap.servers", RULE.getConfiguration().kafka.getHosts()); 

final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); 
final TopicFilter sourceTopicFilter = new Whitelist(RULE.getConfiguration().kafka.getTopic()); 

final VerifiableProperties decoderProps = new VerifiableProperties(); 
decoderProps.props().setProperty("schema.registry.url", RULE.getConfiguration().kafka.getRegistry()); 
decoderProps.props().setProperty("max.schemas.per.subject", "1"); 
final List<KafkaStream<String, Object>> streams = connector 
    .createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS, new StringDecoder(decoderProps), new KafkaAvroDecoder(decoderProps)); 

final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARTITIONS); 
for (final KafkaStream stream : streams) { 
    executorService.submit(() -> { 
     try { 
      final ConsumerIterator it = stream.iterator(); 
      while (it.hasNext()) { 
       final MessageAndMetadata messageAndMetadata = it.next(); 
       final String key = (String) messageAndMetadata.key(); 
       System.out.println("KEY" + key); 
      } 
     } catch (final Exception ex) { 
      LOGGER.error("ERROR", ex); 
     } 
    }); 
} 

私の問題は、私のコードは、タイムアウトになるまでit.hasNext()状態で待機し続けること、です。私はおそらくここでいくつかの詳細が不足しているかもしれませんが、なぜ私は話題から何も得られないのか分かりません。このテストの一環として、消費者が始まる直前にこのトピックに数多くのレコードを送信するプロデューサーがいるため、オフセット問題にはなりません。どんなアイデアも大歓迎です。

答えて

0

解決策が見つかりました。エラーは私が投稿したコードの外でした。 ExecutorServiceのシャットダウンのために私が提供したタイムアウトは短すぎるため、コンシューマージョブを実行するのに十分な時間を提供することなくそれを強制終了しました。

関連する問題