2017-09-21 12 views
0

私はspring-kafka libを使用してKafka消費者を実装しました。 私は2つのパーティションを持つKafkaトピックを持っていますが、コンテナインスタンスがspring-kafka documentationに従って単一パーティションから消費する必要があるため、並行性レベルを2に設定したConcurrentKafkaListenerContainerFactoryを使用します。Spring-kafka listener concurenncy

KafkaMessageListenerContainerは、すべてのメッセージをすべての トピック/パーティションから1つのスレッドで受信します。 ConcurrentMessageListenerContainerは、マルチスレッド消費を提供するために、1つ以上の KafkaMessageListenerContainerに委譲します。

私の消費者のクラスがあります:

@Component 
public class KafkaConsumer { 
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>(); 

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group") 
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException { 
     String message = record.value().toString(); 
     Event event = EventFactory.createEvent(message); 
     String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID); 
     // add event to hashMap 
     LinkedBlockingQueue<Event> queue = hashMap.get(customerId); 
     if (queue == null) { 
      queue = new LinkedBlockingQueue<>(); 
      queue.add(event); 
      hashMap.put(customerId, queue); 
     } else { 
      queue.add(event); 
     } 
    } 
} 

あなたは私がその中に「HashMapの」コレクションを持って見ての通り、私はメッセージに基づいて対応するキューに自分のイベントを置く属性を「CUSTOMER_ID」。 このような機能には、複数のスレッドアクセスの場合に追加の同期が必要です。spring-kafkaは、同時実行の問題を避けるために、各コンテナの別々のBeanインスタンスではなく、すべてのコンテナに対してBeanインスタンスを1つだけ作成します。

このロジックをプログラムで変更するにはどうすればよいですか?

この問題を解決する唯一の奇妙な方法は、2つのJVMが単一スレッドのコンシューマを持つ個別のアプリケーションを実行することです。その結果、#receiveメソッドによるKafkaConsumerクラスへのアクセスはシングルスレッドになります。

答えて

1

これは間違いありません。それがどのように機能するか。フレームワークは本当にBeanではなく、関数にメッセージを渡すメソッドだけに依存しています。

トピック内の各パーティションに対して、2つの@KafkaListenerメソッドがあると考えられます。 1つのパーティションのレコードが1つのスレッド内の@KafkaListenerに配信されることは事実です。したがって、実際にその状態で暮らすことができない場合は、スレッドごとに2つのHashMapを使用することができます。

リスナー抽象化の背景にある一般的な考え方は、正確には約です。ステートレスの動作です。そのKafkaConsumerは通常の春シングルトン beanです。あなたはその事実に生きて、この状況に応じてソリューションを再設計しなければなりません。

+0

説明のためにthx – MeetJoeBlack