私はkafka-topics.bat --zookeeper localhost:2181 --alter --topic test --config cleanup.policy=compact delete config min.cleanable.dirty.ratio=0.01 --config segment.ms=100 --config delete.retention.ms=100
を使用してトピックをコンパクトにしました。 私は同じキーを持つ2000のメッセージを送っています。私がそれらのメッセージを消費するとき、私は1つの圧縮されたメッセージではなく、それぞれのメッセージを個別に取得します。トピックのためのカフカ締固め
1
A
答えて
0
あなたが参照している圧縮設定は、Kafkaクライアントとメッセージをどのように消費するかには関係ありません。詳細はofficial documentation hereをご確認ください。
クライアントがメッセージをどのように消費するかを制御するには、client config propertiesを使用してクライアントを設定する必要があります。
トピックを300ミリ秒間プールし、各メッセージを個別に処理するために反復処理できる一連のメッセージ(ConsumerRecords)を受信する状況を考えてみましょう。
while(true) {
ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(300);
if(records.count() > 0) {
for(ConsumerRecord<String, JsonNode> record: records) {
if(counter % 500 == 0) {
log.info("Record recovered, groupId: {}, topicName: {}, key: {}, value: {} , offset: {}",
this.groupId, this.topicNames, record.key(), record.value(), record.offset());
}
}
}
}
関連する問題
- 1. AngularJS/Jasmineテストの締め日
- 2. 削除のためにマークされたカフカのトピックを削除するには
- 3. 締めよレデューサーとアクション
- 4. カフカ複数トピックの消費
- 5. カフカのブートストラップコンフィギュレーションカフカコネクタからカフカの専門家のための
- 6. Apacheのアペックス - カフカ0.9安全なカフカのトピック
- 7. カフカの__consumer_offsetsトピックの問題0.10.2.0
- 8. 200+カフカのトピックでスケーリングする
- 9. 締め切り契約のトークンの自動転送
- 10. 各四半期の初日までに締め切り
- 11. UrlGenerationException:[:topics.updateルート]のために必要なパラメータの欠落[URI:トピック/ {トピック}]
- 12. このアプリのための固めの言語/フレームワーク
- 13. クライアントとユーザのためのカフカの割り当て
- 14. 外界とのカフカ通信のためのポートを開く
- 15. 固定長テキストファイルのための.NETライブラリ
- 16. ドッカーの待ち時間のためにカフカがタイムアウトします
- 17. SHFB他のHTMLコンテンツをドキュメントプロジェクトに含めるための概念的なトピック
- 18. トピックの「ビュー」を格納するための最良の方法
- 19. カフカの特定のトピックのログサイズを制限する
- 20. カフカの特定のトピックの保存方法を確認する
- 21. ユニットテストのためのkafkaトピックをクリアする
- 22. Docker zookeeperは作成したカフカのトピックについて忘れました
- 23. 締め切りからunix時間に変換する
- 24. 締め切りはいつ適切ですか?
- 25. JavaScript - 締め切りがリセットされない
- 26. 締め切り:pug - h1タグ内にイタリック体を作成する
- 27. 最後のメッセージをカフカのトピックに送信
- 28. HOWTOは、私はカフカカサンドラにカフカのトピックから接続セットアップしたいカフカからカサンドラ
- 29. jQuery Mobile:固定ヘッダーを使用するための固定サイドバー
- 30. 最も早い締め切りの最初のアルゴリズムのパラメータの説明
私たちはこのkey = value1 key = value2のようなものを持っているとしましょう。ここでvalue2はkeyの更新された値です。更新された値だけを保持するにはどうすればよいですか? – codehacker
こんにちは、kafkaの仕事ではなく、最新の情報だけを保持することはできません。kafka 0.10.1で導入されたkafkaストリームとKtableまたはGlobalKTableを使用する必要があると言われています。または0.10.0まで私は今までには覚えていない.... https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ https:// kafka .apache.org/documentation/streams#streams_kstream_ktable –
@codehacker - 上記のコメントを参照してください。あなたにタグを付けるのを忘れました。 –