私はSparkエンジンからスパークストリーミングを学び始めました。データ解析とスパークにはまったく新しいものです。私は、将来のデータを予測したい小さなIOTアプリケーションを作成したいだけです。Java SparkストリーミングJSON解析
私はこのトンで、次のようにリアルタイムセンサーJSONデータを送信TIVAのハードウェア、
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
を持っているが、データが掲載されているUNIXタイムスタンプです。 センサーは、各センサー( 's')データを 'd'としてセンサーの配列です。
私がしたいことは、このデータを消費し、spark-streamingするオブジェクトを作成し、sparkのMlib(機械学習)または同等のライブラリを通してすべてのデータを渡して将来のデータを予測することです。
私はこれは私が使用することを決定したすべての技術の選択肢
- で可能になるかどうかの一般的な考え方をしたいですか?
- 入れ子になったJSONをどのように消費できますか? SQLContextを使ってみましたが、成功しませんでした。
- 私がここでやろうとしていることを達成するための一般的なガイドライン。
ここは、私がKAFKAからのメッセージを消費するために使用しているコードです。
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
Set<String> topics = Collections.singleton("RAH");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topics);
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
System.out.println(message._2());
return message._2();
};
});
System.out.println(" json is 0------ 0"+ json);
json.foreachRDD(rdd -> {
rdd.foreach(
record -> System.out.println(record));
});
ssc.start();
ssc.awaitTermination();
PS:リニアリティと優れたパフォーマンスを維持するために、これをJavaで行いたいと思います。ご質問
(スパークSQL、MLのように)任意のスパークモジュールで使用することができ、あなたがこれまで試したどのようなコードを投稿することができますか?それはSpark SQLとStreamingを使用して可能です。 – Shankar
問題の投稿されたコード。 –
'sqlContext'でjson文字列を読み込もうとすると、あなたはどのような問題に直面していますか?そのタスクはシリアル化できない問題ですか? – Shankar