2017-02-01 10 views
0

要件は、我々はここでそれを達成することができ、サーバ .Howを再起動せずにトピックランタイムで以下の属性を設定することです。 現在、プロパティファイルから値を読み取っていますが、ここでは変更を反映するためにサーバーを再起動する必要があります。は、実行時にカフカの消費者のトピックに動的な値を渡すメッセージ駆動チャネルアダプタ用

例: sample.properties = topic1

topic.list、topic2

(展開ディレクトリ内)と、サーバの再起動なしで、将来的にtopic3から消費したいと思います。

トピック最終変数であることがわかります。

は、ファイルシステムパス(デプロイメントディレクトリの外側)からキー(topic.list)を読み取ろうとしましたが、運がうまくいきませんでした。

提案があります。

あなたが動的にオンデマンドで追加トピック用のアダプタを追加するには、Java DSLを使用することができます
<int-kafka:message-driven-channel-adapter 

       id="inAdapter" 
       channel="fromKafka" 
       connection-factory="connectionFactory" 
       key-decoder="kafkaKeyDecoder" 
       payload-decoder="kafkaDecoder"        
       topics="${topic.list}" 
       offset-manager="offsetManager"/> 
+0

どのくらいの頻度でトピックを変更しますか?しかし、ちょっとしたことがありますが(外部の介入も必要です)(アクションを引き起こすためのAPIコールになる可能性があります) – iamiddy

+0

非常に頻繁です。しかし、それは必須条件ですフレキシビリティを持たせることが大切です。この方向に私を導いてください。 – sam

+0

この概念をここで理解しようとすると、https://github.com/spring-projects/spring-integration-samples/tree/master/advanced/dynamic-ftp次に、あなたのユースケースに合わせて簡単にカスタマイズすることができます。あなたはフォローアップをしていますか? – iamiddy

答えて

1

...

@Autowired 
private IntegrationFlowContext flowContext; 

public void addAnotherListenerForTopics(String... topics) { 
    IntegrationFlow flow = 
     IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), topics)) 
      .channel("fromKafka") 
      .get(); 
    this.flowContext.registration(flow).register(); 
} 

bean.addAnotherListenerForTopics("added.new"); 

ポンポン:

<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-java-dsl</artifactId> 
    <version>1.2.1.RELEASE</version> 
</dependency> 

注意ブローカパーティションを次のように使用している場合新しいコンテナは、既存の割り当ての取り消しを避けるために、異なるグループIDを必要とします。

+0

このソリューションの背後にある理論をより詳細な説明の中で説明してください。 –

+0

私は、あなたが理解しづらいことが何であるか分かりません。動的フロー登録は、[このブログの投稿](https://spring.io/blog/2016/09/27/java-dsl-for-spring-integration-1-2-release-candidate-1-is-利用可能)。必要なものがあれば新しい質問をしてください。 –

+0

これを見てください - https://stackoverflow.com/questions/46400329/how-to-pass-topics-dynamically-to-a-kafka-listener –

関連する問題