2016-09-15 23 views
7

以下に示すように、私はカフカにワイルドカードのパターンを使用して購読しています。ワイルドカードは動的な顧客IDを表します。kafka-pythonを使用して複数のkafkaワイルドカードパターンのリストを購読するには?

consumer.subscribe(pattern='customer.*.validations') 

トピック文字列から顧客IDを引き出すことができるため、これはうまくいきます。しかし今、私は少し異なる目的のために同様の話題を聞く機能を拡張する必要があります。それをcustomer.*.additional-validationsとしましょう。多くの機能が共有されているため、コードは同じプロジェクトに存在する必要がありますが、キューのタイプに基づいて別のパスを取る必要があります。

Kafka documentationでは、トピックの配列を購読することができます。しかし、これらはハードコードされた文字列です。柔軟性を可能にするパターンではありません。

>>> # Deserialize msgpack-encoded values 
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) 
>>> consumer.subscribe(['msgpackfoo']) 
>>> for msg in consumer: 
...  assert isinstance(msg.value, dict) 

どういうわけか2つの組み合わせを行うことができますか?種類のこの(非稼働)のように:KafkaConsumerコードで

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations']) 

答えて

8

、それは

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

def subscribe(self, topics=(), pattern=None, listener=None): 
     """Subscribe to a list of topics, or a topic regex pattern 
     Partitions will be dynamically assigned via a group coordinator. 
     Topic subscriptions are not incremental: this list will replace the 
     current assignment (if there is one). 

ですから、正規表現を作成することができ、トピックのリスト、またはパターンをサポートしていますOR条件で|を使用すると、複数の動的トピックregexの購読として機能するはずです。内部では一致のためにreモジュールが使用されます。

(customer.*.validations)|(customer.*.additional-validations)

+0

ありがとうございます!この構文は素晴らしいです。 –

関連する問題