2017-10-11 10 views
2

JSONを持つS3からファイルを読み込みたいユースケースがあります。次に、特定のJSONノードの値に基づいて、データをグループ化してS3に書き出します。Apache SparkでカラムをS3に分割する

データを読み取ることはできますが、JSONキーに基づいてデータをパーティション分割してS3にアップロードする方法についての良い例は見つかりません。誰もがどのような例を提供することができますか、このユースケースで私を助けることができるチュートリアルを教えてください?

私は、データフレームを作成した後、私のデータのスキーマを持っている:

root 
|-- customer: struct (nullable = true) 
| |-- customerId: string (nullable = true) 
|-- experiment: string (nullable = true) 
|-- expiryTime: long (nullable = true) 
|-- partitionKey: string (nullable = true) 
|-- programId: string (nullable = true) 
|-- score: double (nullable = true) 
|-- startTime: long (nullable = true) 
|-- targetSets: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- featured: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- data: struct (nullable = true) 
| | | | | |-- asinId: string (nullable = true) 
| | | | |-- pk: string (nullable = true) 
| | | | |-- type: string (nullable = true) 
| | |-- reason: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- recommended: array (nullable = true) 
| | | |-- element: string (containsNull = true) 

私はCustomerID列にランダムなハッシュに基づいてデータを分割したいです。私はこれを行うときには:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save"); 

それはエラーを与える:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true)); 

私はCustomerID列にアクセスすることができます教えてください。

答えて

3

今スパーク

val jsonDf = spark.read 
    .format("json") 
    .load("path/of/sample.json") 

jsonDf.show() 

+---------+-------+-----+-----+ 
|  CITY|CUST_ID|STATE| ZIP| 
+---------+-------+-----+-----+ 
| San Jose| 115734| CA|95106| 
|Allentown| 115728| PA|18101| 
|Allentown| 115730| PA|18101| 
|San Mateo| 114728| CA|94401| 
| Somerset| 114726| NJ| 8873| 
+---------+-------+-----+-----+ 

"ZIP"によって、その後のパーティションのデータセットとそれをハッキング開始し、S3

jsonDf.write 
    .partitionBy("ZIP") 
    .save("s3/bucket/location/to/save") 
    // one liner athentication to s3 
    //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save") 

Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3

に書き込むのは、例えば、データセット sample.json

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"} 
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"} 
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"} 

を見てみましょう

編集:解像度:(コメントごとに)スキーマで見つかっていないパーティション列はcustomerId

customerIdcustomer構造体の内部に存在するので、customerIdは、パーティションの操作を行い抽出してみてください。

df.withColumn("customerId", $"customer.customerId") 
    .drop("customer") 
    .write.partitionBy("customerId") 
    .save("s3/bucket/location/to/save") 
関連する問題