私はPySparkを使用しています。私はs3にgziped jsonファイルのリストを持っています。これは、アクセスして、変換して、パーケットでs3にエクスポートする必要があります。各jsonファイルには約100k行が含まれていますので、パラレル化するとそれほど意味がありません(しかし、並列化することはできます)が、並列化したファイルは約5kです。私のアプローチは、スクリプトにjsonファイルリストを渡すことです - >リスト上で並列化を実行する - >マップを実行します(これは私がブロックされるところです)。 jsonにアクセスして変換して、変換されたjsonからDFを作成し、それをs3に寄木細工としてダンプします。スパークでネストされたジョブを実行中
-1
A
答えて
0
jsonを分散して読み込むには、言及したようにキーを並列化する必要があります。 s3からの読み取り中にこれを行うには、boto3を使用する必要があります。以下は、そうする方法のスケルトンスケッチです。あなたのユースケースに合わせてdistributedJsonReadを変更する必要があるでしょう。
import boto3
import json
from pyspark.sql import Row
def distributedJsonRead(s3Key):
s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key)
contents = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
return Row(**contents)
pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys
dataRdd = pkeys.map(distributedJsonRead)
Boto3参考:http://boto3.readthedocs.org/en/latest/guide/quickstart.html
編集:1対処する:
後で出力ファイルに入力ファイルの1マッピングを、それがマージされた寄木細工のデータセットを持つことに容易になるだろうという可能性がありますと連携。 JSONの1マッピング:これはあなたがそれを行うために必要な方法がある場合しかし、あなたは私はあなたが1をしたい場合は、これらの操作を並列化することはできませんとは思わないこの
for k in keyList:
rawtext = sc.read.json(k) # or whichever method you need to use to read in the data
outpath = k[:-4]+'parquet'
rawtext.write.parquet(outpath)
ような何かを試みることができます寄木細工のファイル。 Sparkの読み書き機能は、ドライバによって呼び出されるように設計されており、scおよびsqlContextにアクセスする必要があります。これは、1つの寄木細工のディレクトリを持っている可能性が高い別の理由です。
関連する問題
- 1. スパーク・スタンドアロン・クラスタ、1つのエグゼキュータ上で実行中のジョブ
- 2. herokuに展開されたdjangoアプリケーションでスケジュールされたジョブを実行中
- 3. Laravel Queue Worker、RabbitMQ、リモートで生成された実行中のジョブ
- 4. Quartzのネストされたジョブ
- 5. スパーク - スパーク・ジョブに割り当てられる実行者とコアの数
- 6. スケジュールされたジョブを実行する
- 7. ネストされたデータフレームでスプレッドを実行
- 8. qsub GNUパラレル実行中ジョブ
- 9. Sidekiqは他のジョブが実行された後にジョブを実行します
- 10. googleアプリケーションエンジンでmapreduceジョブを実行中
- 11. スパーク:ジョブ
- 12. SQL Serverジョブはスケジュールされたジョブで実行中にエラーで終了しますが、手動でジョブを実行するとうまく動作します。
- 13. Delayed :: Herokuで2回実行中のジョブ?
- 14. Giraphジョブはローカルモードで常に実行中
- 15. スパークヤーンで1000ジョブが実行中
- 16. 起動アプリケーション中にジョブが実行される
- 17. スパークETLジョブは一度だけmysqlを実行します
- 18. PHP MySQLでループ中にネストされた文を実行するには?
- 19. Google dataproc sparkジョブで「ジョブの実行中にノードが再起動されました」が失敗します。メッセージ
- 20. ネストされたクエリの実行者
- 21. Hudson:現在実行中のジョブのconfig.xmlを更新し、実行中のジョブが更新されたconfig.xmlを即座に認識するようにする
- 22. 長時間実行中のWebジョブがAzureによって中止される
- 23. スケジュールされたSQLジョブが実行されていない
- 24. ネストされたTableViewControllerでtvOSが実行されます
- 25. React-router:ネストされたPlainRouteでgetChildRoutesが実行されない
- 26. 毎時スパーク・ジョブ・サーバーの自動スパーク・ジョブをスケジュールする
- 27. スパークSQLでネストされた属性にアクセスする方法
- 28. hadoopが「実行中のジョブ」でスタックしました
- 29. 長時間実行中のジョブSpring
- 30. のMapReduce:INFO mapreduce.Job:実行中のジョブ:job_1506081922448_0004
パスの昏睡連結リストをリーダに渡すことができます。 – zero323
sparkのインストールによっては、sparkから直接s3を読むことができるかもしれません: 'rawtext = sc.textFile( 's3:// bucket/file')' – Paul
私が寄木張りをダンプしているときは、 1.json => 1.parquetを意味するので、ファイル名をカンマで区切ると、この一貫性は失われます。 – Sar009