2016-12-07 8 views
5

私はKafkaConsumer 0.10 Java APIを使用しています。私は特定のパーティションと特定のオフセットから消費したい。私は見上げて、シーク方法があるが例外を投げていることを発見した。誰もが同様のユースケースまたはソリューションを持っていましたか?KafkaConsumer 0.10 Java APIエラーメッセージ:パーティションの現在の割り当てがありません

コード:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); 
consumer.seek(new TopicPartition("mytopic", 1), 4); 

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) 
    at xx.xxx.xxx.Test.main(Test.java:182) 

答えて

18

あなたが消費者に話題のトピックまたはassign()パーティションへsubscribe()からseek()あなた最初に必要ことができる前に。また、subscribe()assign()は遅延しているので、seek()を使用する前にpoll()に「ダミーコール」を行う必要があります。

あなたがsubscribe()を使用する場合は、グループ管理を使用しますので、あなたは同じgroup.idを使用して複数の消費者を開始することができますし、トピックのすべてのパーティションが自動的にグループ内のすべての消費者に均等に割り当てられます(各パーティションは、Aに割り当てられてしまいますグループ内の単一の消費者)。

特定のパーティションを読み取る場合は、assign()を使用して手動割り当てを行う必要があります。これにより、任意の割り当てを行うことができます。

Btw:KafkaConsumerには、例を含む非常に詳細なクラスJavaDocがあります。それを読む価値があります。

+0

ありがとうございました。それは働きました:) assign()とseek()の組み合わせで – colossal

+1

あなたは 'application.id'の代わりに' group.id'を意味すると思います。 – automaticgiant

+0

ここにあまりにも多くの#KafkaStream質問に回答しています... @automaticgiantを指摘してくれてありがとう –

1

poll()を使用してマップレコードを取得し、オフセット自体を変更したくない場合は、コーディネーターイベントの

... 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); 
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); 
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);  

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); 
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings 
consumer.seekToBeginning(partitions); //or other seek 

投票: カフカバージョン0.11 はこれを試してみてください。これにより、コーディネーターが認識され、消費者がグループに参加したことが保証されます(グループ管理を使用している場合)。これは、有効になっている場合、周期的なオフセットコミットも処理します。

関連する問題