2017-12-06 14 views
0

私たちは、クラスタ内のノードによって生成され、クラスタ内のすべてのノードに配信されるメッセージを格納するためにKafkaを使用しています。 akka-streamsしかし、私はこれを結びつけなければならないいくつかの質問があります。これにはいくつかの制約があります。Kafkaに関する質問複数のノードでAkka-Streamsを経由する過渡メッセージのコンシューマ

まず、クラスタ内のすべてのノードでメッセージを消費する必要がありますが、生成するメッセージは1つのノードだけです。私は各ノードにおそらくそのノードID、つまり各ノードがメッセージを受け取ることを意味するグループIDを割り当てることができると理解しています。それは並べ替えられた。しかしここに質問があります。

データは非常に一時的でかなり大きく(1メガの直下)、さらに圧縮したり分割したりすることはできません。トピックに新しいメッセージがある場合、古いメッセージはかなりゴミです。どのようにしてトピックを基本的に現在最大の1つのメッセージに制限することができますか?

ノードを開始するためにデータが必要であることを前提として、私はそれを以前に消費したかどうかにかかわらず、最新のメッセージを消費する必要があります。これが可能かどうか、もしそうなら、どのようにそれを行うことができますか?

最後に、データは通常トピックにありますが、時にはそこには存在せず、理想的にはそこにメッセージがあるかどうかを確認する必要があります。これは可能ですか?

これは、私は現在、消費者を開始するために使用していたコードです:

private Control startMatrixConsumer() { 
    final ConsumerSettings<Long, byte[]> matrixConsumerSettings = ConsumerSettings 
     .create(services.actorSystem(), new LongDeserializer(), new ByteArrayDeserializer()) 
     .withBootstrapServers(services.config().getString("kafka.bootstrapServers")) 
     .withGroupId("group1") // todo put in the conf ?? 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
    final String topicName = Matrix.class.getSimpleName() + '-' + eventId; 
    final AutoSubscription subscription = Subscriptions.topics(topicName); 
    return Consumer.plainSource(MatrixConsumerSettings, subscription) 
     .named(Matrix.class.getSimpleName() + "-Kafka-Consumer-" + eventId) 
     .map(data -> { 
      final Matrix matrix = services.kryoDeserialize(data.value(), Matrix.class); 
      log.debug(format("Received %s for event %d from Kafka", Matrix.class.getSimpleName(), matrix.getEventId())); 
      return matrix; 
     }) 
     .filter(Objects::nonNull) 
     .to(Sink.actorRef(getSelf(), NotUsed.getInstance())) 
     .run(ActorMaterializer.create(getContext())); 
} 

おかげでたくさん。

答えて

0

すべてのメッセージはクラスタ内のすべてのノードで消費される必要がありますが、 は1つだけ生成されます。

正しいですが、これはノードごとに一意のグループIDを持つことで実現できます。

トピックを基本的に1つのメッセージに制限するにはどうすればよいですか?現在は ですか?

カフカはcompacted topicsを提供します。 圧縮されたトピックは、特定のキーの最新のメッセージのみを保持します。たとえば、カフカの消費者は、コンパクトなトピックでオフセットを保存します。 あなたのケースでは、すべてのメッセージをと同じキーで生成し、Kafka Log Cleanerは古いメッセージを削除します。圧縮が定期的に行われることに注意してください、あなたは時間の短い期間のために同じキーを持つ2つ(またはそれ以上)のメッセージで終わることができますしてください(あなたのLog Cleaner configurationに依存します。

私は、最新のメッセージを消費する必要がありますトピックに関係なく、私は前にそれを消費している かどうか。

あなたは、消費者のオフセット(falseからenable.auto.commitセット)をコミットし、earliestauto.offset.resetを設定しないことによってこれを達成することができます。あなた固めトピックで一つのメッセージを有することにより、およびトピックの始めから始まる消費者、その混乱年齢はノードの起動後に常に消費されます。

メッセージがあるかどうかを確認する必要があります。メッセージがない場合は、メッセージを作成するには プロデューサに依頼してください。

残念ながら、私は残念ながら、あなたに役立つカフカの機能については気づいていません。ほとんどの場合、カフカは生産者と消費者を切り離すために使用されます。

関連する問題