2016-05-09 6 views
5

分析のためにSPARK SQLコードからデータを照会するRESTインターフェイスを提供するWebサービスからJSONデータを読み取る必要があります。 BLOBストアに格納されたJSONを読み込んで使用できます。Spark SQL:DataFrameとしてRESTサービスからjsonデータを消費する方法

私は、RESTサービスからデータを読み込み、それ以外のDataFrameのように使うのが最善の方法であると思っていました。

私はSPARK 1.6 of Linux cluster on HD insightを使用しています。また、誰かが同じコードスニペットを共有することができれば感謝します。私はまだSPARK環境にはまったく新しいです。スパーク1.6で

+0

としてデータフレームを作成することは、それはあなたがダウンロードした後、(ノードに配布)並列化したい単一のJSONブロブである、またはそれは多くありますあなたはむしろノードで直接ダウンロードしたい異なるJSON文字列ですか?単一のBLOBの場合は、http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasetsに記載されているようにフォーマットされます。「各行には、独立した有効なJSONオブジェクト " – Ashish

+1

@Ashish:これらは実際には私のウェブサービスによって、どこかで8から10のオーダーで公開されている実際には複数のファイルであり、サイズはそれほど大きくありません。私の分析のために、HDFSに保存されたデータのコンテキストとして主に使用されます。私はリンクを見ましたが、すべての例でローカルファイルを探していますが、これは 'val path ="というようにいくらか読めるようになっています。http://www.examples/src/main/resources?type = people "' – Kiran

+0

jsonは階層構造でデータフレームはフラットなので、Sparkは任意のjsonをデータフレームに解析できません。 jsonがsparkによって作成されていない場合は、「各行に別々の独立した有効なJSONオブジェクトが含まれている必要があります」という要件を満たさないため、カスタムコードを使用して解析し、大文字小文字のクラスオブジェクトまたはスパークSQL行。スカラーで解析する方法の1つがhttp://stackoverflow.com/questions/37003083/spark-parquet-nested-value-flatten/37005148#37005148 – Ashish

答えて

4

あなたはPythonの上にある場合は、情報を取得し、それからRDDを作成するrequestsライブラリを使用しています。 Scala用の類似のライブラリが必要です(関連thread)。 それからちょうど行いますScalaのため

json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}' 
rdd = sc.parallelize([json_str]) 
json_df = sqlContext.jsonRDD(rdd) 
json_df 

コード:JSONは、などの階層構造とデータフレームであるため、スパークは、データフレームに任意のJSONを解析することはできません http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

+0

@キランあなたの質問に答える答えを受け入れてください – aggFTW

1

:から

val anotherPeopleRDD = sc.parallelize(
    """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) 
val anotherPeople = sqlContext.read.json(anotherPeopleRDD) 

これは、平らな。 jsonがsparkによって作成されていない場合、conditionに準拠していない可能性があります。「各行には、別々の独立した有効なJSONオブジェクトが含まれている必要があります」ので、カスタムコードを使用して解析し、大文字小文字のオブジェクトのコレクションまたはスパークSQL行。

あなたは次のようにダウンロードすることができます。

import scalaj.http._ 
val response = Http("proto:///path/to/json") 
    .header("key", "val").method("get") 
    .execute().asString.body 

、その後shown in this answerとしてあなたJSONを解析します。そして、(配列言う)あなたの場合、クラスのオブジェクトの配列を作成し、

seq.toDF 
関連する問題