私はspring-kafka libを使用してKafka消費者を実装しました。 私は2つのパーティションを持つKafkaトピックを持っていますが、コンテナインスタンスがspring-kafka documentationに従って単一パーティションから消費する必要があるため、並行性レベルを2に設定したConcurrentKafkaListenerContainerFactory
を使用します。Spring-kafka listener concurenncy
KafkaMessageListenerContainerは、すべてのメッセージをすべての トピック/パーティションから1つのスレッドで受信します。 ConcurrentMessageListenerContainerは、マルチスレッド消費を提供するために、1つ以上の KafkaMessageListenerContainerに委譲します。
私の消費者のクラスがあります:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
あなたは私がその中に「HashMapの」コレクションを持って見ての通り、私はメッセージに基づいて対応するキューに自分のイベントを置く属性を「CUSTOMER_ID」。 このような機能には、複数のスレッドアクセスの場合に追加の同期が必要です。spring-kafkaは、同時実行の問題を避けるために、各コンテナの別々のBeanインスタンスではなく、すべてのコンテナに対してBeanインスタンスを1つだけ作成します。
このロジックをプログラムで変更するにはどうすればよいですか?
この問題を解決する唯一の奇妙な方法は、2つのJVMが単一スレッドのコンシューマを持つ個別のアプリケーションを実行することです。その結果、#receiveメソッドによるKafkaConsumerクラスへのアクセスはシングルスレッドになります。
説明のためにthx – MeetJoeBlack