0
iamは私たちのプレイスカラプロジェクトで反応性カフカを開発しました。私たちは5つのトピックを作成しました。これはコンシューマーグループと働き良い人が購読しています。私は 私のコードがある(それが可能である)既存のコンシューマ・グループにこのトピックを追加することができます。反応カフカのサブスクリプションに新しくトピックを追加する
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootStrapServer)
.withGroupId(groupId).withPollInterval(100 millis)
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicList))
.groupedWithin(10, 15 seconds)
.map({
group =>
var offSetBatch = CommittableOffsetBatch.empty
val sessionList = group.toList.map { eachItem =>
offSetBatch = offSetBatch.updated(eachItem.committableOffset)
Json.parse(eachItem.record.value()).as[cityModel]
}
processRecords(cityList)
offSetBatch
}).mapAsync(1)(_.commitScaladsl())
.toMat(Sink.ignore)(Keep.both)
.run()
私たちは、トピックのパターンを与えることができ、消費者の作成に消費者