2

こんにちは、私はSpark Streamingを初めて使用しています。私はXMLファイルを読んで、カフカのトピックに送信しようとしています。ここに私のKafkaコードがあります。これはKafka-console-consumerにデータを送信します。kafka-Spark-Streamingからデータを読み取っているときにEmptyが取得される

コード:

package org.apache.kafka.Kafka_Producer; 

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.Properties; 
import java.util.Properties; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutionException; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("unused") 
public class KafkaProducer { 
    private static String sCurrentLine; 
    public static void main(String args[]) throws InterruptedException, ExecutionException{ 
     try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt"))) 
     { 
      while ((sCurrentLine = br.readLine()) != null) { 
       System.out.println(sCurrentLine); 
       kafka(sCurrentLine); 
      } 
     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace();} 
    } 
    public static void kafka(String sCurrentLine) { 
     Properties props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class","kafka.producer.DefaultPartitioner"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> producer = new Producer<String, String>(config); 
     producer.send(new KeyedMessage<String, String>("sample",sCurrentLine)); 
     producer.close(); 
    } 
} 

私はカフカConsoleの消費者のデータを受け取ることができます。下のスクリーンショットでは、トピックに送信したデータを見ることができます。

enter image description here

今、私は火花ストリーミングを使用してカフカ・コンソール・消費者に送信するデータをストリームする必要があります。ここにコードがあります。以下のバージョンを使用して

enter image description here

:あなた以下

./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar 

は、データが受信されたかのスクリーンショットを見ることができます。このように私の仕事を提出しながら、

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

public class SparkStringConsumer { 

    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 
     Set<String> topics = Collections.singleton("sample"); 

     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
     String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 
     directKafkaStream.foreachRDD(rdd -> { 
     System.out.println("--- New RDD with " + rdd.partitions().size() 
      + " partitions and " + rdd.count() + " records"); 
     rdd.foreach(record -> System.out.println(record._2)); 
     }); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

emptysetを取得します:

スパーク - 2.0.0

飼育係-3.4.6

カフカ - 0.8.2.1

任意の提案は、

+0

SparkReceiverクラスのコードはどこですか?トピックを "mytopic"として使用しているSparkStringConsumerクラスとトピック "sample"に関するメッセージを送信しているKafkaProducerクラスを投稿しました。確認していただけますか? – abaghel

+0

今更新されましたか? –

+0

kafkaに新しいメッセージを作成してみてください –

答えて

1

が最後にインターネット上でサーフィンをした後、私はこれらの解決策を見つけてください。

"Spark-Submit"と "SetMaster"を同時に使用しないでください。

  • あなたがIDEからコードを実行した場合、あなたのコード
  • で使用SetMasterあなたは「火花送信」を通じてjarファイルを実行した場合

そしてもう一つ、あなたのコード内でsetMasterを入れていませんまず火花瓶を出して、Kafka-Console-Consumerにデータを送ってください。

ワーキングファイン。

関連する問題