2017-11-14 8 views
-1

データを読み取り、タプルから2番目の要素を選択するコードを記述しました。 2番目の要素はJSONです。私は列と行としてmarketplaceId、はcustomerIdなどのようなJSONのキーを持つデータフレームを作成したい、今すぐJsonキーをSparkの列に変換する

{"data": {"marketplaceId":7,"customerId":123,"eventTime":1471206800000,"asin":"4567","type":"OWN","region":"NA"},"uploadedDate":1471338703958} 

:以下

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.conf.Configuration; 
import com.amazon.traffic.emailautomation.cafe.purchasefilter.util.CodecAwareManifestFileSystem; 
import com.amazon.traffic.emailautomation.cafe.purchasefilter.util.CodecAwareManifestInputFormat; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import amazon.emr.utils.manifest.input.ManifestItemFileSystem; 
import amazon.emr.utils.manifest.input.ManifestInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat ; 
import scala.Tuple2; 

val configuration = new Configuration(sc.hadoopConfiguration); 
ManifestItemFileSystem.setImplementation(configuration); 
ManifestInputFormat.setInputFormatImpl(configuration, classOf[TextInputFormat]); 
val linesRdd1 = sc.newAPIHadoopFile("location", classOf[ManifestInputFormat[LongWritable,Text]], classOf[LongWritable], classOf[Text], configuration).map(tuple2 => tuple2._2.toString()); 

は例です: コードは、JSONを取得しますその価値を持つ。私はこれをどのように進めるのか分かりません。誰かが同じことを達成するのに役立つポインタで私を助けることができますか?

答えて

0

あなたは、このリンクに https://coderwall.com/p/o--apg/easy-json-un-marshalling-in-scala-with-jackson

を使用してマーシャリング/アンマーシャルJSONのためのScalaのオブジェクトを作成し、Scalaではケースクラスを使用してJSONデータを読み取るため、そのオブジェクトを使用することができます。

import org.apache.spark.{SparkConf, SparkContext} 

object stackover { 
    case class Data(
        marketplaceId: Double, 
        customerId: Double, 
        eventTime: Double, 
        asin: String, 
        `type`: String, 
        region: String 
       ) 
    case class R00tJsonObject(
          data: Data, 
          uploadedDate: Double 
          ) 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf(true) 
    conf.setAppName("example"); 
    conf.setMaster("local[*]") 

    val sc = new SparkContext(conf) 
    val data = sc.textFile("test1.json") 
    val parsed = data.map(row => JsonUtil.readValue[R00tJsonObject](row)) 

    parsed.map(rec => (rec.data, rec.uploadedDate, rec.data.customerId, 
rec.data.marketplaceId)).collect.foreach(println) 
} 
} 

出力:

(Data(7.0,123.0,1.4712068E12,4567,OWN,NA),1.471338703958E12,123.0,7.0) 
関連する問題