2

私はSparkエンジンからスパークストリーミングを学び始めました。データ解析とスパークにはまったく新しいものです。私は、将来のデータを予測したい小さなIOTアプリケーションを作成したいだけです。Java SparkストリーミングJSON解析

私はこのトンで、次のようにリアルタイムセンサーJSONデータを送信TIVAのハードウェア、

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}] 

を持っているが、データが掲載されているUNIXタイムスタンプです。 センサーは、各センサー( 's')データを 'd'としてセンサーの配列です。

私がしたいことは、このデータを消費し、spark-streamingするオブジェクトを作成し、sparkのMlib(機械学習)または同等のライブラリを通してすべてのデータを渡して将来のデータを予測することです。

私はこれは私が使用することを決定したすべての技術の選択肢

  1. で可能になるかどうかの一般的な考え方をしたいですか?
  2. 入れ子になったJSONをどのように消費できますか? SQLContextを使ってみましたが、成功しませんでした。
  3. 私がここでやろうとしていることを達成するための一般的なガイドライン。

ここは、私がKAFKAからのメッセージを消費するために使用しているコードです。

SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]"); 

    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

    // TODO: processing pipeline 
    Map<String, String> kafkaParams = new HashMap<String, String>(); 
    kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092"); 
    Set<String> topics = Collections.singleton("RAH"); 


    JavaPairInputDStream<String, String> directKafkaStream = 
      KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, 
        StringDecoder.class, kafkaParams, topics); 


    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() { 
     public String call(Tuple2<String,String> message) throws Exception { 
      System.out.println(message._2()); 
      return message._2(); 
     }; 
    }); 


    System.out.println(" json is 0------ 0"+ json); 



    json.foreachRDD(rdd -> { 
     rdd.foreach(
       record -> System.out.println(record)); 
    }); 

    ssc.start(); 
    ssc.awaitTermination(); 

PS:リニアリティと優れたパフォーマンスを維持するために、これをJavaで行いたいと思います。ご質問

+0

(スパークSQL、MLのように)任意のスパークモジュールで使用することができ、あなたがこれまで試したどのようなコードを投稿することができますか?それはSpark SQLとStreamingを使用して可能です。 – Shankar

+0

問題の投稿されたコード。 –

+0

'sqlContext'でjson文字列を読み込もうとすると、あなたはどのような問題に直面していますか?そのタスクはシリアル化できない問題ですか? – Shankar

答えて

2

回答:

1)これは私が使用することを決定したすべての技術の選択肢が可能になりますかどうか?

`Ans: Yes it can be done and quiet a normal use-case for spark.` 

2)ネストされたJSONをどのように消費できますか? SQLContextを使ってみましたが、成功しませんでした。

`Ans: Nested JSON with SQLContext is little tricky. You may want to use Jackson or some other JSON library.` 

3)私がここでやろうとしていることを達成するための一般的なガイドライン。

Ans: Consuming messages through kafka seems fine, but only a limited machine learning algorithms are supported through streaming.

あなたが他の機械学習アルゴリズムまたはサードパーティのライブラリを使用したい場合は、おそらくあなたは最後にモデルをemmitingバッチジョブとしてモデルの作成を検討すべきです。ストリーミングジョブは、モデルをロードしてデータストリームを取得し、予測するだけです。

+0

このようなユースケースの適切なドキュメントに案内できますか?それは非常に有用です –

4

あなたがSparkSessionから、スパーク2.0を使用しているので、あなたはJSON

json.foreachRDD(rdd -> { 

     DataFrame df= spark.read.json(rdd) 
     //process json with this DF. 
} 

を読むことができるか、そして、あなたがcreateDataFrameメソッドを使用することができ、行のRDDにRDDを変換することができます。

json.foreachRDD(rdd -> { 

      DataFrame df= spark.createDataFrame(rdd); 
      //process json with this DF. 
    } 

DFからネストされたJSON処理が可能です。thisの記事に従うことができます。あなたがDFにあなたのJSONに変換後

また、あなたが

+0

私の場合SQLContextのコンストラクタで私は使用を試みたが非難されました。また、 'JavaSparkContext'を使用して 'sc'(SparkContext)を取得する方法が得られません。 –

+0

@RahulBorkar:SQLContext(javasparkContext) – Shankar

+0

に 'JavaSparkContext'を渡すことができます。また、あなたのコードを試してみると、 "メソッドの変換(関数、JavaRDD >)JavaDStream型のあいまいです" –