2017-07-18 4 views
1

行をforeachPartition内のJSONに変換する方法はありますか? 私はHow to convert Row to json in Spark 2 Scalaを見ました。 しかし、foreachPartition内からsqlContextにアクセスすることはできません。また、データには入れ子型が含まれているため、この方法は機能しません。foreachPartition内で行をJSONに変換するにはどうすればよいですか?

dataframe.foreachPartition { partitionOfRecords => 

    .. 
    val connectionString: ConnectionStringBuilder = new ConnectionStringBuilder(
       eventHubsNamespace, 
       eventHubName, 
       policyName, 
       policyKey) 

    val eventHubsClient: EventHubClient = EventHubClient.createFromConnectionString(connectionString.toString()).get() 

    val json = /* CONVERT partitionOfRecords to JSON */ 

    val bytes = json.getBytes() 
    val eventData = new EventData(bytes) 
    eventHubsClient.send(eventData) 
    } 

答えて

1

私は強くforeachPartition前にJSON への変換を行うことをお勧めしたいです。

なぜなら、to_json関数を使用して文字列化されたJSONを構築するために使用できるfunctionsオブジェクトにJSONの組み込みサポートが組み込まれているからです。

to_jsonを(E:カラム):カラムは、指定されたスキーマとStructType又はStructTypesArrayType JSONに文字列を含む列に変換します。

私は次のことをやってお勧めします:

dataframe. 
    select(to_json($"your-struct-column-here")). 
    as[String]. 
    foreachPartition { json: String => ... } 
関連する問題