2017-03-22 10 views
0

iamは私たちのプレイスカラプロジェクトで反応性カフカを開発しました。私たちは5つのトピックを作成しました。これはコンシューマーグループと働き良い人が購読しています。私は 私のコードがある(それが可能である)既存のコンシューマ・グループにこのトピックを追加することができます。反応カフカのサブスクリプションに新しくトピックを追加する

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(bootStrapServer) 
     .withGroupId(groupId).withPollInterval(100 millis) 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicList)) 
      .groupedWithin(10, 15 seconds) 
      .map({ 
       group => 
       var offSetBatch = CommittableOffsetBatch.empty 
       val sessionList = group.toList.map { eachItem => 
        offSetBatch = offSetBatch.updated(eachItem.committableOffset) 
        Json.parse(eachItem.record.value()).as[cityModel] 
       } 
       processRecords(cityList) 
       offSetBatch 
      }).mapAsync(1)(_.commitScaladsl()) 
      .toMat(Sink.ignore)(Keep.both) 
      .run() 

私たちは、トピックのパターンを与えることができ、消費者の作成に消費者

答えて

0

にトピックを追加することができますどのような方法があります私は問題を解決するSubscriptions.topicPattern( "*。in")

関連する問題