2017-09-12 16 views
0

1人のプロデューサ& 1つのコンシューマ& 1つのパーティション。コンシューマ/プロデューサはどちらもスプリングブートアプリケーションです。コンシューマーアプリはローカルマシン上で実行され、プロデューサーはkafka &飼い飼い子機と共にリモートマシン上で実行されます。kafkaプロデューサ/コンシューマの再起動後にコンシューマがメッセージを受信しない

開発中に、プロデューサアプリケーションをいくつか変更して再デプロイしました。しかしその後、私の消費者はメッセージを受け取っていません。私は消費者を再開しようとしましたが、運はありません。何が問題になりうるのか、どのように解決できるのか?

消費者設定:

spring: 
    cloud: 
    stream: 
     defaultBinder: kafka 
     bindings: 
     input: 
      destination: sales 
      content-type: application/json 
     kafka: 
     binder: 
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
      defaultZkPort: 2181 
      defaultBrokerPort: 9092 
server: 
    port: 0 

プロデューサーコンフィグ

cloud: 
stream: 
    defaultBinder: kafka 
    bindings: 
    output: 
     destination: sales 
     content-type: application/json 
    kafka: 
    binder: 
     brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
     zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
     defaultZkPort: 2181 
     defaultBrokerPort: 9092 

EDIT2

5分後に、消費者のアプリは、以下の例外を除いて死ぬ:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder 
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder: 
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel 
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel 
+0

かなりシンプルなシナリオのような音です。そのアプリケーションをGitHubのどこかで共有して問題をローカルで再現できるようにしますか? –

+0

@ArtemBilan申し訳ありませんが、自分のコードを共有することはできません。ソリューションの提案にはどのような詳細が必要ですか? – LazyTechie

+0

私はコードなしのアイデアはありません。おそらく、コンシューマとプロデューサの設定を共有できますか?そして、はい、私はあなたがアプリケーション全体を共有することはできませんが、少なくとも、私たちのためのいくつかの簡単なスプリングブートアプリケーションを思い付くかもしれないことを知っています... –

答えて

0

これはすでにbugがと報告されているように見えますが、resetOffsetというプロパティは無効です。したがって、消費者は常に、オフセットがlatestのメッセージを要求しました。

gitの問題で説明したように、唯一の回避策は、kafkaコンシューマのCLIツールでこれを修正することです。

1

DEBUGについての上記の提案がさらに詳しい情報を示しているかどうかを確認してください。あなたがKafkaTopicProvisionerからタイムアウト例外をいくつか受け取っているようです。しかし、それはあなたが私が推測する消費者を再起動するときに起こります。消費者が何とかブローカーとやりとりするのに問題があるようで、あなたは何が起こっているのかを知る必要があるようです。

関連する問題