2017-08-29 6 views
0

kafka 0.8.x docカフカの消費者にマルチスレッドにする方法を示しています嵐の中KafkaSpoutのマルチスレッドか

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put(topic, new Integer(a_numThreads)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 


// now launch all the threads 
// 
executor = Executors.newFixedThreadPool(a_numThreads); 

// now create an object to consume the messages 
// 
int threadNumber = 0; 
for (final KafkaStream stream : streams) { 
    executor.execute(new ConsumerTest(stream, threadNumber)); 
    threadNumber++; 
} 

しかしKafkaSpoutマルチスレッドではないようです。 たぶんKafkaSpoutにマルチタスクの代わりに、マルチスレッドを使用します。

builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), nThread); 

どちらが良いですか?ありがとう

答えて

0

あなたがKafka 0.8.xに言及して以来、あなたが使用しているKafkaSpoutがstorm-kafka-client以外のstorm-kafkaから来たと仮定しています。

最初のコードスニペットは、複数のスレッドを使用して複数のパーティションを使用する高水準のコンシューマのAPIです。

カフカの噴出口については、おそらく同じですが、ストームは低レベルの消費者、つまりSimpleConsumerを使用しています。ただし、各スパウトエグゼキュータ(タスク)ごとに1つのSimpleConsumerインスタンスが作成されます。

関連する問題