2016-05-06 9 views
0

私はある条件で一時停止したいと思っているカフカの消費者を持っていて、後でそれを再開して前のすべてのメッセージを消費します。 1つのアイデアは、他のスレッドによって更新され、消費する前に共有フラグを使用することです。すなわち、iterator.next().message()フラグの値をチェックします。それが本当であれば、他のものを消費しないでください。ちょうど私が正しい方向に考えているのか、それとももっと良い方法があるのか​​をチェックしたかったのです。高レベルのカフカ消費者を一時停止する

class KafkaConsumer implements Runnable { 
     KafkaStream<byte[], byte[]> topicStream; 
     ConsumerConnector consumerConnectorObj; 

     public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream, 
       final ConsumerConnector consumerConnectorObj) { 
      this.topicStream = topicStream; 
      this.consumerConnectorObj = consumerConnectorObj; 
     } 

     @Override 
     public void run() { 
      if (topicStream != null) { 
       ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator(); 
       while (true) { 
         if (iterator != null) { 
          boolean nextFlag = true; 
          try { 
           iterator.hasNext(); 
          } catch (ConsumerTimeoutException e) { 
           LOG.warn("Consumer timeout occured", e); 
           nextFlag = false; 
          } 

          if (nextFlag) { 
           byte[] msg = iterator.next().message(); 
          } 
         } 
       } 
      } 
     } 
    } 

答えて

0

通常、間違ってしまう可能性が高いため、手作業でスレッド間で同期を行う必要はありません。助手のためにjava.util.concurrentを見てください。あなたの場合、それはセマフォかもしれません。それを使用する最も簡単な方法は、メッセージを処理する前にセマフォを取得し、それを後で戻して、次のループですぐに再度取得しようとすることです。

私の勘違いは、これを行うには最良の方法ではないということです。私はむしろavailablePermits()に電話をかけ、その番号が大きい方を使い続けています。ゼロになったときにのみ、セマフォを取得しようとします。これにより、もう1つのスレッドが1つの許可を再度提供するまでスレッドをブロックします。これによりワーカースレッドのブロックが解除され、パーミットが渡されます。これは直ちに元の状態に戻り、上記のようにループします。

while (true) { 
    if (semaphore.availablePermits()<=0) { 
    semaphore.acquire(); // will block 
    // eventually another thread increments the semaphore again 
    // and we arrive here 
    semaphore.release(); 
    } 
    // keep processing messages 
} 

あなたはthis questionへの回答に追加のアイデアを見つけることができます。

+0

ありがとうHarald私はそれを試してみると戻ってきます –

関連する問題