2017-06-22 5 views
1

私は3つのパーティションを持つトピックに接続された単一のカフカコンシューマを持っています。カフカからレコードを取得するとすぐに、私はオフセットとパーティションをキャプチャしたいと思います。再起動時に私はカフカのドキュメントからカフカ同じオフセットから再起動

を相殺最後の読み取りから、消費者の位置を復元したいと思います:

各レコードは、独自に付属しているので、あなた自身が、あなただけの次の操作を行う必要がありオフセット管理するために、オフセット:

設定のenable.auto.commit = falseを

はオフセットあなた 位置を保存するために、各ConsumerRecordを提供を使用してください。

再起動時にseek (TopicPartition、long)を使用してコンシューマの位置を復元します。ここで

私のサンプルコードです:

constructor{  
    load data into offsetMap<partition,offset> 
    initFlag=true; 
} 

Main method 
{ 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    if(initFlag) // is this correct way to override offset position? 
    { 
     seekToPositions(offsetMap); 
     initFlag=false; 
    } 
    while(!shutdown) 
    { 
     for (ConsumerRecord<String, String> record : records) { 
       System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
       getOffsetPositions();// dump offsets and partitions to db/disk 
     } 
    } 
} 

//get current offset and write to a file 
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{ 

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>(); 
    //code to put partition and offset into map 
    //write to disk or db 

    } 
} // Overrides the fetch offsets that the consumer 

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) { 
      //code get partitions and offset from offsetMap 
      consumer.seek(partition, offset); 

    } 

はこれが行うには正しい方法は何ですか?もっと良い方法がありますか?

答えて

1

あなたのオフセットをコミットした場合カフカは(デフォルトでは24時間まで)あなたのためにそれらを格納します。

あなたの消費者が死亡した場合、別のマシンで同じコードを開始し、中断した場所から右に続行することができます。外部ストレージは必要ありません。 https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

を参照してください「オフセットと消費者の位置が」あなたはcommitSync

0
私には大丈夫だ

、単にパーティションの割り当ては、特別なケアは、パーティションの割り当てを変更する場合に対処するために必要とされる自動的に行われている場合は、あなたの消費者は、ビルド(手動パーティション逢引または自動)

でどのように注意してください。これは、subscribe(Collection、ConsumerRebalanceListener)およびsubscribe(Pattern、ConsumerRebalanceListener)の呼び出しでConsumerRebalanceListenerインスタンスを提供することによって実行できます。たとえば、コンシューマからパーティションを取得する場合、コンシューマは、ConsumerRebalanceListener.onPartitionsRevoked(Collection)を実装することによって、それらのパーティションのオフセットをコミットする必要があります。コンシューマにパーティションが割り当てられている場合、コンシューマはそれらの新しいパーティションのオフセットを検索し、ConsumerRebalanceListener.onPartitionsAssigned(Collection)を実装することによってコンシューマをその位置に正しく初期化する必要があります。

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

+0

を使用することを検討してくださいお勧めしますが、その前に指摘したように、私は、手動でパーティションを処理していますはい、アウトハンドリング自動パーティショニングを指していただき、ありがとうございます、私しかいません1コンシューマが3つのパーティションすべてに接続しているので、そのことでうまくいくはずです –

関連する問題