こんにちは、私はSpark Streamingを初めて使用しています。私はXMLファイルを読んで、カフカのトピックに送信しようとしています。ここに私のKafkaコードがあります。これはKafka-console-consumerにデータを送信します。kafka-Spark-Streamingからデータを読み取っているときにEmptyが取得される
コード:
package org.apache.kafka.Kafka_Producer;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutionException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@SuppressWarnings("unused")
public class KafkaProducer {
private static String sCurrentLine;
public static void main(String args[]) throws InterruptedException, ExecutionException{
try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt")))
{
while ((sCurrentLine = br.readLine()) != null) {
System.out.println(sCurrentLine);
kafka(sCurrentLine);
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();}
}
public static void kafka(String sCurrentLine) {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class","kafka.producer.DefaultPartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>("sample",sCurrentLine));
producer.close();
}
}
私はカフカConsoleの消費者のデータを受け取ることができます。下のスクリーンショットでは、トピックに送信したデータを見ることができます。
今、私は火花ストリーミングを使用してカフカ・コンソール・消費者に送信するデータをストリームする必要があります。ここにコードがあります。以下のバージョンを使用して
:あなた以下
./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar
は、データが受信されたかのスクリーンショットを見ることができます。このように私の仕事を提出しながら、
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStringConsumer {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("sample");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
ssc.start();
ssc.awaitTermination();
}
}
emptysetを取得します:
スパーク - 2.0.0
飼育係-3.4.6
カフカ - 0.8.2.1
任意の提案は、
SparkReceiverクラスのコードはどこですか?トピックを "mytopic"として使用しているSparkStringConsumerクラスとトピック "sample"に関するメッセージを送信しているKafkaProducerクラスを投稿しました。確認していただけますか? – abaghel
今更新されましたか? –
kafkaに新しいメッセージを作成してみてください –