2017-05-05 38 views
0

私はSparkストリーミングが初めてです。私が達成しようとしているのは、kafkaのjson文字列データを読み込んでDStreamに格納し、それをElasticsearchにロードできるようにDatasetに変換することです。私はこのpostのコードの一部を使用しています。シンボルJavaSparkSessionSingletonを解決できません。

これは、実際のコードです:私は、シンボルJavaSparkSessionSingletonを解決できないというエラーを取得しています

  import org.apache.spark.SparkConf; 
      import org.apache.spark.api.java.JavaRDD; 
      import org.apache.spark.api.java.JavaSparkContext; 
      import org.apache.spark.api.java.function.VoidFunction; 
      import org.apache.spark.sql.*; 
      import org.apache.spark.sql.streaming.StreamingQuery; 
      import org.apache.spark.sql.streaming.StreamingQueryException; 
      import org.apache.spark.sql.types.DataTypes; 
      import org.apache.spark.sql.types.StructField; 
      import org.apache.spark.sql.types.StructType; 
      import org.apache.spark.streaming.Duration; 
      import org.apache.spark.sql.SparkSession; 
      import org.apache.spark.streaming.api.java.JavaDStream; 
      import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
      import org.apache.spark.streaming.api.java.JavaStreamingContext; 
      import org.apache.spark.streaming.kafka.KafkaUtils; 

      import org.apache.spark.sql.Dataset; 
      import org.apache.spark.sql.Row; 

      import java.util.Collections; 
      import java.util.HashMap; 
      import java.util.Map; 
      import java.util.Set; 
      import org.apache.spark.api.java.function.Function; 


      import kafka.serializer.StringDecoder; 
      import scala.Tuple2; 

      public class SparkConsumer { 

        public static void main(String[] args) throws InterruptedException { 

         SparkConf conf = new SparkConf().setAppName("readKafkajson").setMaster("local[*]"); 

         JavaSparkContext sc = new JavaSparkContext(conf); 

         JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

         // TODO: processing pipeline 
         Map<String, String> kafkaParams = new HashMap<String, String>(); 
         kafkaParams.put("metadata.broker.list", "localhost:9092"); 
         Set<String> topics = Collections.singleton("kafkajson"); 

         JavaPairInputDStream<String, String> directKafkaStream = 
           KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, 
             StringDecoder.class, kafkaParams, topics); 





         JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() { 
          public String call(Tuple2<String,String> message) throws Exception { 
           System.out.println(message._2()); 
           return message._2(); 
          }; 
         }); 

         System.out.println(" json is 0------ 0"+ json); 



         json.foreachRDD(rdd -> { 
          rdd.foreach(
            record -> System.out.println(record)); 
         }); 

         //Create JavaRDD<Row> 
         json.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
          @Override 
          public void call(JavaRDD<String> rdd) { 
           JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() { 
            @Override 
            public Row call(String msg) { 
             Row row = RowFactory.create(msg); 
             return row; 
            } 
           }); 
           //Create Schema 
           StructType schema = DataTypes.createStructType(new StructField[] {DataTypes.createStructField("Message", DataTypes.StringType, true)}); 
           //Get Spark 2.0 session 
           SparkSession spark = **JavaSparkSessionSingleton**.getInstance(rdd.context().getConf()); 
           Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema); 
           msgDataFrame.show(); 
          } 
         }); 

         ssc.start(); 
         ssc.awaitTermination(); 

        } 
       } 

私は、Spark 2.0.1を使用していますし、私のMavenの依存関係は次のようになります。

   <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-core_2.11</artifactId> 
         <version>2.0.1</version> 
        </dependency> 
        <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-streaming_2.11</artifactId> 
         <version>2.0.1</version> 
        </dependency> 
        <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-streaming-kafka_2.11</artifactId> 
         <version>1.6.3</version> 
        </dependency> 
        <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-sql_2.11</artifactId> 
         <version>2.0.1</version> 
        </dependency> 
        <dependency> 

私は、私が行方不明ですかわからないです。どんな助けもありがとうございます。

+0

作成あなたのSparkConsumerのファイルの中に 'JavaSparkSessionSingleton'というクラスがあります。 このSparkの例の最後をSpark docs: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCountから確認してください。 .java – Raphael

答えて

0

公式スパークドキュメントは、あなたのクラスの下にそれを追加し、あなたのセッションを保持するためにシングルトンクラスを作成するためにあなたを導く。

ここスパークドキュメントからのサンプル、完全な例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java

+0

ありがとうございます。これはうまくいった! – vkr

関連する問題