2016-05-11 1 views
0

私はOSGIフレームワークを持っています。そこでは、1つのバンドルでREST呼び出しを受け入れ、残りの呼び出しで受け取ったデータはKAFKAブローカーに送られます。ブローカーからのメッセージを消費する別のバンドルがあります。OSGIフレームワークでKafka 0.9.0.0コンシューマを設定するには?

RESTバンドルの前にKAFKAコンシューマバンドルを初期化すると、コードがKAFKAコンシューマーコードのwhileループで実行されるため、REST bundleActivatorは呼び出されません。消費者バンドルの前にRESTバンドルを初期化すると、コンシューマーバンドルは決して開始しません。続き

はKAFKAバンドルのアクティベーターのためのコードです。:

public class KafkaConsumerActivator implements BundleActivator { 

    private static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; 
    private static final String GROUP_ID = "group.id"; 
    private static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; 
    private static final String KEY_DESERIALIZER = "key.deserializer"; 

    private ConsumerConnector consumerConnector; 

    private KafkaConsumer<String, String> consumer; 

    private static final String VALUE_DESERIALIZER = "value.deserializer"; 
    public void start(BundleContext context) throws Exception { 

     Properties properties = new Properties(); 
     properties.put(ZOOKEEPER_CONNECT, 
      MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT); 
     properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID); 
     properties.put(BOOTSTRAP_SERVERS, 
      MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT); 
     properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName()); 
     properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName()); 
     consumer = new KafkaConsumer<>(properties); 

     try { 
      consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME)); 

      while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
       for (ConsumerRecord<String, String> record : records) { 
        Map<String, Object> data = new HashMap<>(); 
        data.put("partition", record.partition()); 
        data.put("offset", record.offset()); 
        data.put("value", record.value()); 
        System.out.println(": " + data); 
       } 
      } 
     } catch (WakeupException e) { 
      // ignore for shutdown 
     } finally { 
      consumer.close(); 
     } 
    } 
} 

答えて

0

あなたはアクティベーターのstartメソッドに時間がかかる何かを行うべきではありません。それはOSGiフレームワーク全体をブロックするでしょう。

接続とループ全体を1つのスレッドで実行することをお勧めします。 stopメソッドでは、このスレッドを終了するように指示できます。

+0

おかげでキリスト教徒!私はちょうどメッセージのためにKafka Brockerを聴き続けるスレッドを走らせ、stopメソッドでスレッドを殺しました。 –

+0

これは適切なアプローチですが、スレッドを "殺す"方法には注意が必要です。 Javaでスレッドを殺す本当の方法はありません。あなたは 'interrupt'メソッドを呼び出すことでうまくやめてください。したがって、そのスレッド内のループは、時折、自身の中断された状態をチェックし、要求されたときにきれいに終了する必要があります。 –

関連する問題