0
私はOSGIフレームワークを持っています。そこでは、1つのバンドルでREST呼び出しを受け入れ、残りの呼び出しで受け取ったデータはKAFKAブローカーに送られます。ブローカーからのメッセージを消費する別のバンドルがあります。OSGIフレームワークでKafka 0.9.0.0コンシューマを設定するには?
RESTバンドルの前にKAFKAコンシューマバンドルを初期化すると、コードがKAFKAコンシューマーコードのwhileループで実行されるため、REST bundleActivatorは呼び出されません。消費者バンドルの前にRESTバンドルを初期化すると、コンシューマーバンドルは決して開始しません。続き
はKAFKAバンドルのアクティベーターのためのコードです。:
public class KafkaConsumerActivator implements BundleActivator {
private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
private static final String GROUP_ID = "group.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String KEY_DESERIALIZER = "key.deserializer";
private ConsumerConnector consumerConnector;
private KafkaConsumer<String, String> consumer;
private static final String VALUE_DESERIALIZER = "value.deserializer";
public void start(BundleContext context) throws Exception {
Properties properties = new Properties();
properties.put(ZOOKEEPER_CONNECT,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT);
properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID);
properties.put(BOOTSTRAP_SERVERS,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT);
properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName());
properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(properties);
try {
consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
}
おかげでキリスト教徒!私はちょうどメッセージのためにKafka Brockerを聴き続けるスレッドを走らせ、stopメソッドでスレッドを殺しました。 –
これは適切なアプローチですが、スレッドを "殺す"方法には注意が必要です。 Javaでスレッドを殺す本当の方法はありません。あなたは 'interrupt'メソッドを呼び出すことでうまくやめてください。したがって、そのスレッド内のループは、時折、自身の中断された状態をチェックし、要求されたときにきれいに終了する必要があります。 –