2016-04-07 7 views
-1

私はPySparkを使用しています。私はs3にgziped jsonファイルのリストを持っています。これは、アクセスして、変換して、パーケットでs3にエクスポートする必要があります。各jsonファイルには約100k行が含まれていますので、パラレル化するとそれほど意味がありません(しかし、並列化することはできます)が、並列化したファイルは約5kです。私のアプローチは、スクリプトにjsonファイルリストを渡すことです - >リスト上で並列化を実行する - >マップを実行します(これは私がブロックされるところです)。 jsonにアクセスして変換して、変換されたjsonからDFを作成し、それをs3に寄木細工としてダンプします。スパークでネストされたジョブを実行中

+0

パスの昏睡連結リストをリーダに渡すことができます。 – zero323

+0

sparkのインストールによっては、sparkから直接s3を読むことができるかもしれません: 'rawtext = sc.textFile( 's3:// bucket/file')' – Paul

+0

私が寄木張りをダンプしているときは、 1.json => 1.parquetを意味するので、ファイル名をカンマで区切ると、この一貫性は失われます。 – Sar009

答えて

0

jsonを分散して読み込むには、言及したようにキーを並列化する必要があります。 s3からの読み取り中にこれを行うには、boto3を使用する必要があります。以下は、そうする方法のスケルトンスケッチです。あなたのユースケースに合わせてdistributedJsonReadを変更する必要があるでしょう。

import boto3 
import json 
from pyspark.sql import Row 

def distributedJsonRead(s3Key): 
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key) 
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8')) 
    return Row(**contents) 

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys 
dataRdd = pkeys.map(distributedJsonRead) 

Boto3参考:http://boto3.readthedocs.org/en/latest/guide/quickstart.html

編集:1対処する:

後で出力ファイルに入力ファイルの1マッピングを、それがマージされた寄木細工のデータセットを持つことに容易になるだろうという可能性がありますと連携。 JSONの1マッピング:これはあなたがそれを行うために必要な方法がある場合しかし、あなたは私はあなたが1をしたい場合は、これらの操作を並列化することはできませんとは思わないこの

for k in keyList: 
    rawtext = sc.read.json(k) # or whichever method you need to use to read in the data 
    outpath = k[:-4]+'parquet' 
    rawtext.write.parquet(outpath) 

ような何かを試みることができます寄木細工のファイル。 Sparkの読み書き機能は、ドライバによって呼び出されるように設計されており、scおよびsqlContextにアクセスする必要があります。これは、1つの寄木細工のディレクトリを持っている可能性が高い別の理由です。

+0

しかし、私はまた、寄木細工が生成されたファイルから、jsonが 's3:// bucket/json/12/3/abc.json'にあると仮定すると、その結果は' s3:// bucket/json/12/3/abc.parquet' – Sar009

+0

@ Sar009私はこの – David

+0

に対処するために私の応答を編集しました。約20GBのデータが1日に生成されます。そして、一般的に私は、このような場合には1日または2日のデータを分析しなければならない。だから、基本的には私はファイルを並列化することができないと言っている。 – Sar009

関連する問題