データベースからデータを読み取り、Spark SQL集計を適用するSparkジョブがあります。次のようにコードは(唯一のconfオプションを省略)である:Kafkaからスパークスチームを読み込み、JavaでSpark SQL集計を適用します。
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sqlContext = new SQLContext(sc);
Dataset df = MongoSpark.read(sqlContext).options(readOptions).load();
df.registerTempTable("data");
df.cache();
aggregators = sqlContext.sql(myQuery);
今私はストリーミングスパーク経由カフカからのメッセージを読み取り、スパークSQL経由で同じ集計を適用する別のジョブを作成します。これまでのコードは次のとおりです。
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.99.100:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", KafkaStatisticsPayloadDeserializer.class);
kafkaParams.put("group.id", "Group1");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(topic);
SparkConf conf = new SparkConf().setAppName(topic).setMaster("local");
/*
* Spark streaming context
*/
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));
/*
* Create an input DStream for Receiving data from socket
*/
JavaInputDStream<ConsumerRecord<String, StatisticsRecord>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, StatisticsRecord>Subscribe(topics, kafkaParams)
);
これまでのところ、私は正常にメッセージを読み込んでデシリアライズしました。だから私の質問は、実際にスパークSQLアグリゲーションを適用する方法です。私は以下を試みたが、うまくいきませんでした。私は何とか実際のメッセージを含む "値"フィールドをまず分離する必要があると思う。
SQLContext sqlContext = new SQLContext(streamingContext.sparkContext());
stream.foreachRDD(rdd -> {
Dataset<Row> df = sqlContext.createDataFrame(rdd.rdd(), StatisticsRecord.class);
df.createOrReplaceTempView("data");
df.cache();
Dataset aggregators = sqlContext.sql(SQLContextAggregations.ORDER_TYPE_DB);
aggregators.show();
});
これを見ますか? https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.htmlまたはhttps://databricks.com/blog/2017/04/26/processing-data-in-apache- kafka-with-structured-streaming-in-apache-spark-2-2.html –