2016-05-04 13 views
1

現在、私はsparkストリーミングとjsonのカフカからデータを取得しています。 自分のrddをデータフレームに変換してテーブルとして登録します。私は列名がデータフレームでない存在するクエリを発射するとき、それは データフレームにカラム名が存在しませんspark

"'No such struct field currency in price, recipientId;'" 

HEre is my query 
val selectQuery = "lower(serials.brand) as brandname, lower(appname) as appname, lower(serials.pack) as packname, lower(serials.asset) as assetname, date_format(eventtime, 'yyyy-MM-dd HH:00:00') as eventtime, lower(eventname) as eventname, lower(client.OSName) as platform, lower(eventorigin) as eventorigin, meta.price as price, client.ip as ip, lower(meta.currency) as currency, cast(meta.total as int) as count" 

Here is my dataframe 
DataFrame[addedTime: bigint, appName: string, client: struct<ip:string>, eventName: string, eventOrigin: string, eventTime: string, geoLocation: string, location: string, meta: struct<period:string,total:string>, serials: struct<asset:string,brand:string,pack:string>, userId: string]> 

のようなエラーがスローされますことをやった後、今私のJSONは、厳密なものではなく、いくつかのキーが存在しないかもしれない時間があります。データフレームにキーや列がない場合、この例外を安全に回避するにはどうすればよいですか?

答えて

0

だから、私が見つけた唯一の方法は、あなたのJSONのためのJSONスキーマを作成し、datafrmae

ヴァルDF = sqlcontext.read.schema(スキーマ).json(RDD)

にあなたのJSONを解析するためにそのスキーマを使用していました
1

df.columnsを使用して列をチェックできます。列名とデータ型df.schemaを取得する方法はたくさんあります。スキーマdf.printSchema()をログに記録することもできます

関連する問題