2016-07-29 10 views
1

夜間にジョブを実行すると、カフカキュー内のすべてのメッセージが取得され、プロセスが実行されます。私はメッセージを受け取ることができますが、カフカストリームはより多くのメッセージを待っていて、処理を続行できません。私は、次のコードを持っている:キュー内のすべてのカフカメッセージを取得し、Javaでストリーミングを停止する

... 
private ConsumerConnector consumerConnector; 
private final static String TOPIC = "test"; 

public MessageStreamConsumer() { 
     Properties properties = new Properties(); 
     properties.put("zookeeper.connect", "localhost:2181"); 
     properties.put("group.id", "test-group"); 
     ConsumerConfig consumerConfig = new ConsumerConfig(properties); 
     consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 
    } 
public List<String> getMessages() { 
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
       topicCountMap.put(TOPIC, new Integer(1)); 
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector 
         .createMessageStreams(topicCountMap); 
       KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0); 
       ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
       List<String> messages = new ArrayList<>(); 
       while (it.hasNext()) 
        messages.add(new String(it.next().message())); 
       return messages; 
      } 

コードは、メッセージを取得することができますが、それはそれはラインにとどまるの最後のメッセージを処理する場合:私は、すべて取得することができますどのように

while (it.hasNext()) 

質問ですが、カフカからのメッセージは、ストリームを停止し、私の他のタスクを続行します。

私はあなたが私にカフカストリームは最初から消費するサポートしていないようだ

おかげ

+0

可能性のある[Kafka Consumerの複製がjavaの.hasNextにぶら下がっている](0120)を参照してください。 – zaynetro

+0

しかし、私は最高だとは思わないこれを行う習慣は、例外がスローされるまで待つことです。もし私のプロセスがタイムアウトが設定されている時間がもっとかかる場合はどうすればいいですか –

+0

KafkaStreamではなく、まっすぐなKafka Consumerを使ってはいけませんか?ストリームは自然に生き続けるでしょう。 – NegatioN

答えて

0

を助けることができると思います。
ネイティブのカフカコンシューマを作成し、auto.offset.resetを最も早く設定すると、最初からメッセージが消費されます。

0

このようなものが動作する可能性があります。基本的には、カフカコンシューマーを使用してレコードを取得し、空のバッチを取得するまで停止するまで考えています。

package kafka.examples; 

import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.Calendar; 
import java.util.Collections; 
import java.util.Date; 
import java.util.Properties; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.atomic.AtomicBoolean; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 


public class Consumer1 extends Thread 
{ 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final DateFormat df; 
    private final String logTag; 
    private boolean noMoreData = false; 
    private boolean gotData = false; 
    private int messagesReceived = 0; 
    AtomicBoolean isRunning = new AtomicBoolean(true); 
    CountDownLatch shutdownLatch = new CountDownLatch(1); 

    public Consumer1(Properties props) 
    { 
     logTag = "Consumer1"; 

     consumer = new KafkaConsumer<>(props); 
     this.topic = props.getProperty("topic"); 
     this.df = new SimpleDateFormat("HH:mm:ss"); 

     consumer.subscribe(Collections.singletonList(this.topic)); 
    } 

    public void getMessages() { 
     System.out.println("Getting messages..."); 
     while (noMoreData == false) { 
      //System.out.println(logTag + ": Doing work..."); 

      ConsumerRecords<Integer, String> records = consumer.poll(1000); 
      Date now = Calendar.getInstance().getTime(); 
      int recordsCount = records.count(); 
      messagesReceived += recordsCount; 
      System.out.println("recordsCount: " + recordsCount); 
      if (recordsCount > 0) { 
       gotData = true; 
      } 

      if (gotData && recordsCount == 0) { 
       noMoreData = true; 
      } 

      for (ConsumerRecord<Integer, String> record : records) { 
       int kafkaKey = record.key(); 
       String kafkaValue = record.value(); 
       System.out.println(this.df.format(now) + " " + logTag + ":" + 
         " Received: {" + kafkaKey + ":" + kafkaValue + "}" + 
         ", partition(" + record.partition() + ")" + 
         ", offset(" + record.offset() + ")"); 
      } 
     } 
     System.out.println("Received " + messagesReceived + " messages"); 
    } 

    public void processMessages() { 
     System.out.println("Processing messages..."); 
    } 

    public void run() { 
     getMessages(); 
     processMessages(); 
    } 
} 
0

私は現在、カフカ0.10.0.1を使用して開発し、私は実際に何が起こるかを把握するためにいくつかの実験を行ってきた消費者の財産auto.offset.resetの使用に関する混合情報を見つけましたよ。

はこれらに基づいて、私は今、それをこのように理解して:あなたはプロパティを設定する場合:何もコミットがオンに行われていないとき

auto.offset.reset=earliest 

これは、割り当てられたパーティション(で最初に使用可能なメッセージのいずれかに消費者を配置します最後にコミットされたパーティションのオフセットにコンシューマを配置します(コンシューマの再起動ごとに最後にコミットされたメッセージを再読み込みすることに注意してください)。

設定しないでくださいauto.offset.resetどのmea nsのデフォルト値 'latest'が使用されます。

この場合、コンシューマを接続する際に古いメッセージは表示されません。コンシューマの接続後にトピックに公開されたメッセージのみが受信されます。

結論として、特定のトピックと割り当てられたパーティションに対して利用可能なすべてのメッセージを確実に受け取るには、seekToBeginning()を呼び出さなければなりません。

それはあなたの消費者が割り当てられているパーティションを取得することを確認するために最初のポーリング(0L)を呼び出すことをお勧めているようだし、「最初」に割り当てられたパーティションのそれぞれを求める(またはConsumerRebalanceListenerであなたのコードを実装!):

kafkaConsumer.poll(0L); 
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment()); 
関連する問題