2016-09-19 22 views
4

私は、将来的にサイズが大きくなる可能性のあるトピックのリストを(現在は10です)持っています。私は各トピックから消費する複数のスレッド(トピックごと)を生成することができますが、私の場合はトピックの数が増えるとトピックから消費するスレッドの数が増えます。データをあまりにも頻繁に取得するので、スレッドは理想的な状態になります。複数話題のカフカ消費者

単一の消費者にすべてのトピックから消費させる方法はありますか?はいの場合、どのように達成できますか?また、オフセットはカフカによってどのように維持されますか?答えを提案してください。

答えて

4

我々は、次のAPIを使用して複数のトピックをサブスクライブすることができます。 consumer.subscribe(は、Arrays.asList(topic1、topic2)、ConsumerRebalanceListenerのOBJ)

消費者は、トピックの情報を持っており、我々はconsumer.commitAsyncや消費者を使用してCOMITすることができます次のようにOffsetAndMetadataオブジェクトを作成して.commitSync()を呼び出します。

ConsumerRecords<String, String> records = consumer.poll(long value); 
for (TopicPartition partition : records.partitions()) { 
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
     System.out.println(record.offset() + ": " + record.value()); 
    } 
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
} 
+0

私は知っていますが、オフセットはどのようにカフカによって維持されますか?また、単一の消費者グループを持つことで私の問題が解決されますか? – Apollo

+1

オフセットはアプリケーションによってコミットされ、__consumer_offsetsという特別なオフセットカーフカのトピックに格納されます。オフセットは各トピックの各パーティションごとに保持されるので、購読しているトピックの数は関係ありません。 –

1

複数のスレッドは必要ありません。複数のトピックから消費する1つのコンシューマを持つことができます。 kafka-server自体はステートレスなので、オフセットはzookeeperによって維持されます。 コンシューマがメッセージを消費するたびに、そのメッセージのオフセットがzookeeperによってコミットされ、将来のトラックで各メッセージを一度しか処理しないようにします。したがって、カフカの故障の場合でも、消費者は最後にコミットされたオフセットから消費を開始します。

+1

カフカ0.9以降では、オフセットは動物園の代わりにカフカのトピックに保存されています –

関連する問題