2017-04-05 4 views
0

私のKafkaプロデューサーとKafkaクライアントが別のHadoopクラスターにあるのに対し、私のローカルマシンでKafkaコンシューマー(Javaコード)を実行したいですか?Kafkaローカルマシン上で動作するConsumer-Java API

kafkaプロデューサスクリプト(kafka-console-producer.sh)を使用してkafkaクラスタにメッセージを送信できますが、ローカルのEclipseコンソールでkafkaコンシューマのメッセージにアクセスできません。

設定の変更が必要な場合は教えてください。任意の例のJavaコードに対応

+1

何が問題なのかを見るためにコンシューマコードを共有できますか? – Kaushal

+0

@Kaushal、以下のコードを投稿しました。 – Sanj

+0

あなたのクラスタ上でコンシューマjarをローカルで実行しようとします。 –

答えて

0
package com.hortonworks.example.kafka.consumer; 
import org.apache.kafka.clients.CommonClientConfigs; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.TopicPartition; 
import java.util.Collection; 
Configuring Kafka for Kerberos 
Over Ambari 
May 9, 2016 
9 
import java.util.Collections; 
import java.util.Properties; 
public class BasicConsumerExample { 
public static void main(String[] args) {`enter code here` 
Properties consumerConfig = new Properties(); 
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka. 
example.com:6667"); 
// specify the protocol for SSL Encryption 
consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_PLAINTEXT"); 
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); 
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest"); 
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer"); 
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org. 
apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfig); 
TestConsumerRebalanceListener rebalanceListener = new 
TestConsumerRebalanceListener(); 
consumer.subscribe(Collections.singletonList("test-topic"), 
rebalanceListener); 
while (true) { 
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000); 
for (ConsumerRecord<byte[], byte[]> record : records) { 
System.out.printf("Received Message topic =%s, partition =%s, 
offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), 
record.offset(), record.key(), record.value()); 
} 
consumer.commitSync(); 
} 
} 
private static class TestConsumerRebalanceListener implements 
ConsumerRebalanceListener { 
@Override 
public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{ 
System.out.println("Called onPartitionsRevoked with partitions:" + 
partitions); 
} 
@Override 
public void onPartitionsAssigned(Collection<TopicPartition> partitions) 
{ 
System.out.println("Called onPartitionsAssigned with partitions:" + 
partitions); 
} 
} 
} 
関連する問題