私のサンプルプログラムでは、ファイルを公開してすぐに消費しようとします。しかし、私の消費者イテレータはnullを返します。 私が間違っていることは何ですか?その上でKafka消費者が空のイテレータを返す
テスト
**main(){**
KafkaMessageProducer producer = new KafkaMessageProducer(topic, file);
producer.generateMessgaes();
MessageListener listener = new MessageListener(topic);
listener.start();
}
のMessageListener
public void start() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
for (KafkaStream<byte[], byte[]> stream : streams) {
System.out.println("The stream is --"+ stream.iterator().makeNext().topic());
executor.submit(new ListenerThread(stream));
}
try { // without this wait the subsequent shutdown happens immediately before any messages are delivered
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
if (consumerConnector != null) {
consumerConnector.shutdown();
}
if (executor != null) {
executor.shutdown();
}
}
ListenerThread
public class ListenerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;
public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
this.stream = msgStream;
System.out.println("----------" + stream.iterator().makeNext().topic());
}
public void run() {
try {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
// MessageAndMetadata<byte[], byte[]> messageAndMetadata =
// it.makeNext();
// String topic = messageAndMetadata.topic();
// byte[] message = messageAndMetadata.message();
System.out.println("111111111111111111111111111");
FileProcessor processor = new FileProcessor();
processor.processFile("LOB_TOPIC", it.next().message());
}
iteratorはnullなので、whileループの中には入っていません。しかし、私は同じ話題に単一のメッセージを公開しており、消費者はその話題を聞いていると確信しています。
すべてのヘルプは、私は昨日、この同じ問題を抱えていた
私は同じ問題を抱えています。メッセージがコンシューマ端末に届いているのを見ることができますが、私のアプリは「空のイテレータ」を表示しています。それは以前から仕事をしていて、突然起き始めました。理由を知っている人なら、私は感謝しています。 – Mhoque