私たちは、クラスタ内のノードによって生成され、クラスタ内のすべてのノードに配信されるメッセージを格納するためにKafkaを使用しています。 akka-streamsしかし、私はこれを結びつけなければならないいくつかの質問があります。これにはいくつかの制約があります。Kafkaに関する質問複数のノードでAkka-Streamsを経由する過渡メッセージのコンシューマ
まず、クラスタ内のすべてのノードでメッセージを消費する必要がありますが、生成するメッセージは1つのノードだけです。私は各ノードにおそらくそのノードID、つまり各ノードがメッセージを受け取ることを意味するグループIDを割り当てることができると理解しています。それは並べ替えられた。しかしここに質問があります。
データは非常に一時的でかなり大きく(1メガの直下)、さらに圧縮したり分割したりすることはできません。トピックに新しいメッセージがある場合、古いメッセージはかなりゴミです。どのようにしてトピックを基本的に現在最大の1つのメッセージに制限することができますか?
ノードを開始するためにデータが必要であることを前提として、私はそれを以前に消費したかどうかにかかわらず、最新のメッセージを消費する必要があります。これが可能かどうか、もしそうなら、どのようにそれを行うことができますか?
最後に、データは通常トピックにありますが、時にはそこには存在せず、理想的にはそこにメッセージがあるかどうかを確認する必要があります。これは可能ですか?
これは、私は現在、消費者を開始するために使用していたコードです:
private Control startMatrixConsumer() {
final ConsumerSettings<Long, byte[]> matrixConsumerSettings = ConsumerSettings
.create(services.actorSystem(), new LongDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(services.config().getString("kafka.bootstrapServers"))
.withGroupId("group1") // todo put in the conf ??
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final String topicName = Matrix.class.getSimpleName() + '-' + eventId;
final AutoSubscription subscription = Subscriptions.topics(topicName);
return Consumer.plainSource(MatrixConsumerSettings, subscription)
.named(Matrix.class.getSimpleName() + "-Kafka-Consumer-" + eventId)
.map(data -> {
final Matrix matrix = services.kryoDeserialize(data.value(), Matrix.class);
log.debug(format("Received %s for event %d from Kafka", Matrix.class.getSimpleName(), matrix.getEventId()));
return matrix;
})
.filter(Objects::nonNull)
.to(Sink.actorRef(getSelf(), NotUsed.getInstance()))
.run(ActorMaterializer.create(getContext()));
}
おかげでたくさん。