2017-05-30 6 views
0

sparkを使用してJSONファイルを書き込もうとしています。値としてnullを持ついくつかのキーがあります。これらは、DataSetでうまく表示されますが、ファイルを書き込むと、キーがドロップされます。それらを確実に保持するにはどうすればよいですか?ソースからJSONデータのsparkでJSONを書き込むときにnull値を持つキーを保持

ddp.coalesce(20).write().mode("overwrite").json("hdfs://localhost:9000/user/dedupe_employee"); 

部:

コードは、ファイルを書き込む

"event_header": { 
     "accept_language": null, 
     "app_id": "App_ID", 
     "app_name": null, 
     "client_ip_address": "IP", 
     "event_id": "ID", 
     "event_timestamp": null, 
     "offering_id": "Offering", 
     "server_ip_address": "IP", 
     "server_timestamp": 1492565987565, 
     "topic_name": "Topic", 
     "version": "1.0" 
    } 

出力:上記の例キーaccept_languageapp_nameevent_timestampにおいて

"event_header": { 
     "app_id": "App_ID", 
     "client_ip_address": "IP", 
     "event_id": "ID", 
     "offering_id": "Offering", 
     "server_ip_address": "IP", 
     "server_timestamp": 1492565987565, 
     "topic_name": "Topic", 
     "version": "1.0" 
    } 

が落ちた。

答えて

1

明らかに、sparkはnullを処理するオプションを提供していません。したがって、以下のカスタムソリューションが機能するはずです。

{"accept_language":null,"app_id":"App_ID","app_name":null,"client_ip_address":"IP","event_id":"ID","event_timestamp":null,"offering_id":"Offering","server_ip_address":"IP","server_timestamp":1492565987565,"topic_name":"Topic","version":"1.0"} 

import com.fasterxml.jackson.module.scala.DefaultScalaModule 
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper 
import com.fasterxml.jackson.databind.ObjectMapper 

case class EventHeader(accept_language:String,app_id:String,app_name:String,client_ip_address:String,event_id: String,event_timestamp:String,offering_id:String,server_ip_address:String,server_timestamp:Long,topic_name:String,version:String) 

val ds = Seq(EventHeader(null,"App_ID",null,"IP","ID",null,"Offering","IP",1492565987565L,"Topic","1.0")).toDS() 

val ds1 = ds.mapPartitions(records => { 
val mapper = new ObjectMapper with ScalaObjectMapper 
mapper.registerModule(DefaultScalaModule) 
records.map(mapper.writeValueAsString(_)) 
}) 

ds1.coalesce(1).write.text("hdfs://localhost:9000/user/dedupe_employee") 

これは、出力を生成します

関連する問題