2017-04-12 16 views
0

高水準消費者のライフサイクルの説明が見つかりません。私は0.8.2.2にあり、カフカクライアントからの「現代的な」消費者は使用できません。ここに私のコードです:カフカの高水準消費者APIライフサイクル0.8

def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = { 
    val consumerProperties = new Properties() 
    consumerProperties.put("zookeeper.connect", kafkaConfig.zooKeeperConnectString) 
    consumerProperties.put("group.id", consumerGroup) 
    consumerProperties.put("auto.offset.reset", "smallest") 

    val consumer = Consumer.create(new ConsumerConfig(consumerProperties)) 

    try { 
     val messageStreams = consumer.createMessageStreams(
     Predef.Map(kafkaConfig.topic -> 1), 
     new DefaultDecoder, 
     new MessageEnvelopeDecoder) 

     val receiveMessageFuture = Future[List[MessageEnvelope]] { 
     messageStreams(kafkaConfig.topic) 
      .flatMap(stream => stream.take(numberOfEvents).map(_.message())) 
     } 

     Await.result(receiveMessageFuture, await) 
    } finally { 
     consumer.shutdown() 
    } 

私には分かりません。各メッセージの取得後にコンシューマをシャットダウンするか、インスタンスを保持してメッセージをフェッチするために再利用する必要がありますか?インスタンスを再利用するのは正しい方法だと思いますが、いくつかの記事やベストプラクティスを見つけることはできません。

私は消費者および/またはメッセージストリームを再利用しようとしています。それは私のためにうまく動作せず、私はそれの理由を見つけることができません。

2017-04-17_19:57:57.088 ERROR MessageEnvelopeConsumer - Error while awaiting for messages java.lang.IllegalStateException: Iterator is in failed state 
java.lang.IllegalStateException: Iterator is in failed state 
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) 
    at scala.collection.IterableLike$class.take(IterableLike.scala:134) 
    at kafka.consumer.KafkaStream.take(KafkaStream.scala:25) 

ここで行われます:私はmessageStreamsを再利用しようとした場合

、私は例外を取得

運...

def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = { 
    try { 
     val receiveMessageFuture = Future[List[MessageEnvelope]] { 
     messageStreams(kafkaConfig.topic) 
      .flatMap(stream => stream.take(numberOfEvents).map(_.message())) 
     } 
     Try(Await.result(receiveMessageFuture, await)) match { 
     case Success(result) => result 
     case Failure(_: TimeoutException) => List.empty 
     case Failure(e) => 
      // ===> never got any message from topic 
      logger.error(s"Error while awaiting for messages ${e.getClass.getName}: ${e.getMessage}", e) 
      List.empty 

     } 
    } catch { 
     case e: Exception => 
     logger.warn(s"Error while consuming messages", e) 
     List.empty 
    } 
    } 

私はmessageStreamsを毎回作成しようとしました

2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while consuming messages 
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector can create message streams at most once 
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151) 
    at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47) 

ハここppens:

def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = { 
    try { 

     val messageStreams = consumer.createMessageStreams(
     Predef.Map(kafkaConfig.topic -> 1), 
     new DefaultDecoder, 
     new MessageEnvelopeDecoder) 

     val receiveMessageFuture = Future[List[MessageEnvelope]] { 
     messageStreams(kafkaConfig.topic) 
      .flatMap(stream => stream.take(numberOfEvents).map(_.message())) 
     } 
     Try(Await.result(receiveMessageFuture, await)) match { 
     case Success(result) => result 
     case Failure(_: TimeoutException) => List.empty 
     case Failure(e) => 
      logger.error(s"Error while awaiting for messages ${e.getClass.getName}: ${e.getMessage}", e) 
      List.empty 

     } 
    } catch { 
     case e: Exception => 
     // ===> now exception raised here 
     logger.warn(s"Error while consuming messages", e) 
     List.empty 
    } 
    } 

UPD

私は、反復子ベースのアプローチを使用していました。それはこのようになります:

// consumerProperties.put("consumer.timeout.ms", "100")  

private lazy val consumer: ConsumerConnector = Consumer.create(new ConsumerConfig(consumerProperties)) 

    private lazy val messageStreams: Seq[KafkaStream[Array[Byte], MessageEnvelope]] = 
    consumer.createMessageStreamsByFilter(Whitelist(kafkaConfig.topic), 1, new DefaultDecoder, new MessageEnvelopeDecoder) 


    private lazy val iterator: ConsumerIterator[Array[Byte], MessageEnvelope] = { 
    val stream = messageStreams.head 
    stream.iterator() 
    } 

    def consume(): List[MessageEnvelope] = { 
    try { 
     if (iterator.hasNext) { 
     val fromKafka: MessageAndMetadata[Array[Byte], MessageEnvelope] = iterator.next 
     List(fromKafka.message()) 
     } else { 
     List.empty 
     } 

    } catch { 
     case _: ConsumerTimeoutException => 
     List.empty 

     case e: Exception => 
     logger.warn(s"Error while consuming messages", e) 
     List.empty 
    } 
    } 

は今、私はそれが自動的にZKにオフセットをコミットするかどうかを把握しようとしている...

答えて

0

私の回答は最新の質問です。イテレータのアプローチは私のために期待通りに機能します。

1

定数シャットダウンが不要なコンシューマ・グループがパフォーマンスに多くの影響を与えているリバランスが発生します。ベストプラクティスについてはこちらの記事をご覧ください:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

+0

こんにちは、あなたのアプローチを実装しようとしました。他の問題に会った。私の更新を確認できますか? – Sergey

+0

したがって、コンシューマ、メッセージストリーム、メッセージを取得してコンシューマを閉じるときのみ、「期待どおり」(メッセージがフェッチされる)として動作します。 – Sergey

関連する問題