2016-05-31 5 views
0

私はKafka 0.10.0を使用しています。処理する前に、パーティション内のレコードのサイズを知りたい。カフカ0.10でトピック区画のオフセット範囲を見つける方法は?

バージョン0.9.0.1では、次のコードを使用して、パーティションのオフセットをlatestearliestの間で変更しました。新しいバージョンでは、consumer#positionメソッドを取得するときに機能しません。

package org.apache.kafka.example.utils; 

import java.util.ArrayList; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

import org.apache.commons.lang3.Range; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.PartitionInfo; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.ByteArraySerializer; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 

public class FindTopicRange { 

    private static Logger logger = LogManager.getLogger(); 

    public FindTopicRange() { 
     // TODO Auto-generated constructor stub 
    } 

    public static Map<TopicPartition, Range<Long>> getOffsets(String topic) { 

     Map<TopicPartition, Range<Long>> partitionToRange = new HashMap<>(); 
     try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerConfigs())) { 

      List<TopicPartition> partitions = new ArrayList<>(); 
      for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) { 
       partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); 
      } 
      consumer.assign(partitions); 

      for (TopicPartition partition : partitions) { 
       consumer.seekToBeginning(Collections.singletonList(partition)); 
       long earliestOffset = consumer.position(partition); 

       consumer.seekToEnd(Collections.singletonList(partition)); 
       long latestOffset = consumer.position(partition); 
       partitionToRange.put(partition, Range.between(earliestOffset, latestOffset)); 
      } 
      return partitionToRange; 
     } catch (Exception e) { 
      logger.error("Exception while getting offset range information for topic : {}", topic, e); 
     } 
     return partitionToRange; 
    } 

    private static Properties getConsumerConfigs() { 
     Properties configs = new Properties(); 
     configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "test"); 
     configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10240); 

     configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); 
     configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); 
     return configs; 
    } 

    public static void main(String[] args) { 
     System.out.println(getOffsets("hello")); 
    } 

} 

上記の呼び出しのためのスタックトレースを以下に示します。

"main" prio=10 tid=0x00007f1750013800 nid=0x443 runnable [0x00007f1756b88000] 
    java.lang.Thread.State: RUNNABLE 
     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) 
     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) 
     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) 
     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) 
     - locked <0x00000007c21cba00> (a sun.nio.ch.Util$2) 
     - locked <0x00000007c21cb9f0> (a java.util.Collections$UnmodifiableSet) 
     - locked <0x00000007c21cb8d8> (a sun.nio.ch.EPollSelectorImpl) 
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) 
     at org.apache.kafka.common.network.Selector.select(Selector.java:454) 
     at org.apache.kafka.common.network.Selector.poll(Selector.java:277) 
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:324) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:306) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1405) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1196) 
+0

'KafkaConsumer'をインスタンス化せずに' latest'と 'earliest'オフセットをフェッチできる方法はありますか? –

答えて

0

は、新しいコンシューマ・グループと偽のそれにしようとしたことがありますか?この投稿は、ラグ値を与えることができることを示していますother method

1

あなたの例をscalaで作ったことができました(すでに同様のコードを使っていました)。私が作成した唯一の追加は、consumer.subscribeとconsumer.assignの両方が怠惰だと仮定して、consumer.pollをコードに追加することでした。

val partitions = new util.ArrayList[TopicPartition] 

for (partitionInfo <- consumer.partitionsFor(topic)) { 
partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))} 

val recordTemp = consumer.poll(1000) 

for (partition <- partitions) { 
    consumer.seekToBeginning(Collections.singletonList(partition)) 
    println(consumer.position(partition)) 
    consumer.seekToEnd(Collections.singletonList(partition)) 
    println(consumer.position(partition)) 
} 
関連する問題