0

データベースからデータを読み取り、Spark SQL集計を適用するSparkジョブがあります。次のようにコードは(唯一のconfオプションを省略)である:Kafkaからスパークスチームを読み込み、JavaでSpark SQL集計を適用します。

SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(sparkConf); 
    sqlContext = new SQLContext(sc); 
    Dataset df = MongoSpark.read(sqlContext).options(readOptions).load(); 
    df.registerTempTable("data"); 
    df.cache(); 
    aggregators = sqlContext.sql(myQuery); 

今私はストリーミングスパーク経由カフカからのメッセージを読み取り、スパークSQL経由で同じ集計を適用する別のジョブを作成します。これまでのコードは次のとおりです。

Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", "192.168.99.100:9092"); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", KafkaStatisticsPayloadDeserializer.class); 
    kafkaParams.put("group.id", "Group1"); 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("enable.auto.commit", false); 

    Collection<String> topics = Arrays.asList(topic); 

    SparkConf conf = new SparkConf().setAppName(topic).setMaster("local"); 

    /* 
    * Spark streaming context 
    */ 
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2)); 
    /* 
    * Create an input DStream for Receiving data from socket 
    */ 
    JavaInputDStream<ConsumerRecord<String, StatisticsRecord>> stream = 
      KafkaUtils.createDirectStream(
        streamingContext, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, StatisticsRecord>Subscribe(topics, kafkaParams) 
      ); 

これまでのところ、私は正常にメッセージを読み込んでデシリアライズしました。だから私の質問は、実際にスパークSQLアグリゲーションを適用する方法です。私は以下を試みたが、うまくいきませんでした。私は何とか実際のメッセージを含む "値"フィールドをまず分離する必要があると思う。

SQLContext sqlContext = new SQLContext(streamingContext.sparkContext()); 
    stream.foreachRDD(rdd -> { 
     Dataset<Row> df = sqlContext.createDataFrame(rdd.rdd(), StatisticsRecord.class); 
     df.createOrReplaceTempView("data"); 
     df.cache(); 
     Dataset aggregators = sqlContext.sql(SQLContextAggregations.ORDER_TYPE_DB); 
     aggregators.show(); 
    }); 
+0

これを見ますか? https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.htmlまたはhttps://databricks.com/blog/2017/04/26/processing-data-in-apache- kafka-with-structured-streaming-in-apache-spark-2-2.html –

答えて

0

ストリームに適用される関数内でコンテキストを呼び出す必要があります。

0

私はこの問題を次のコードで解決しました。実際のオブジェクトではなくJSON形式でメッセージを保存するようになりました。

SparkConf conf = new SparkConf().setAppName(topic).setMaster("local"); 
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2)); 

    SparkSession spark = SparkSession.builder().appName(topic).getOrCreate(); 

    /* 
    * Kafka conf 
    */ 
    Map<String, Object> kafkaParams = new HashMap<>(); 

    kafkaParams.put("bootstrap.servers", dbUri); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 
    kafkaParams.put("group.id", "Group4"); 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("enable.auto.commit", false); 

    Collection<String> topics = Arrays.asList("Statistics"); 

    /* 
    * Create an input DStream for Receiving data from socket 
    */ 
    JavaInputDStream<ConsumerRecord<String, String>> stream = 
      KafkaUtils.createDirectStream(
        streamingContext, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
      ); 
    /* 
    * Keep only the actual message in JSON format 
    */ 
    JavaDStream<String> recordStream = stream.flatMap(record -> Arrays.asList(record.value()).iterator()); 
    /* 
    * Extract RDDs from stream and apply aggregation in each one 
    */ 
    recordStream.foreachRDD(rdd -> { 
     if (rdd.count() > 0) { 
      Dataset<Row> df = spark.read().json(rdd.rdd()); 
      df.createOrReplaceTempView("data"); 
      df.cache(); 

      Dataset aggregators = spark.sql(SQLContextAggregations.ORDER_TYPE_DB); 
      aggregators.show(); 
     } 
    }); 
関連する問題