2017-06-28 10 views
1

私は、どのように私のpysparkシェル(およびスクリプト)にS3ディレクトリからファイルを読み込むかを理解しました。使用して:それは私がONEディレクトリ内のすべてのファイルを読み込むさせるには素晴らしいことだが、Spark EMRを介してネストされたディレクトリのS3ファイルを読む

rdd = sc.wholeTextFiles('s3n://bucketname/dir/*') 

をしかし、私はすべてのディレクトリからすべての単一のファイルを読むことをお勧めします。

メモリの問題が発生するため、すべてを一度に展開したり読み込んだりしません。

代わりに、バッチ方式で各サブディレクトリのすべてのファイルを自動的にロードする必要があります。それは可能ですか?

ここに私のディレクトリ構造があります:

S3_bucket_name - >年(2016または2017) - >月(最大12のフォルダ) - >日(最大31のフォルダ) - >サブ日間のフォルダ(最大30;基本的には毎日収集を分割した)。このような

何か、それはすべての12ヶ月のために行くと、最大31日よ除いて...

BucketName 
| 
| 
|---Year(2016) 
|  | 
|  |---Month(11) 
|  |  | 
|  |  |---Day(01) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |  |---Day(02) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |---Month(12) 
| 
|---Year(2017) 
|  | 
|  |---Month(1) 
|  |  | 
|  |  |---Day(01) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |  |---Day(02) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |---Month(2) 

上記各矢印のフォークを表します。例えば私は2年間のデータを収集していたので、 "年"フォークには2年あります。それから、毎年、最大12か月まで、そして毎月、最大31の可能な日のフォルダ。そして、私はそのようにそれを分割するという理由だけで、毎日では、最大30個のフォルダがあるでしょう...

私はそれが理にかなって願っています...

私は別のポスト(read files recursively from sub directories with spark from s3 or local filesystem)を見ていました - この場合にはそこには保証されませんし、私はすべてのものが必要になり

rdd = sc.wholeTextFiles('s3n://bucketname/*/data/*/*') 

しかし、それに伴う問題は、それがさまざまなサブディレクトリ間で共通のフォルダを見つけようとしている:彼らはワイルドカードを使用して提案し、その何かのように考えています。

はしかし、推論のその行に、私は私がやった場合思った..:

rdd = sc.wholeTextFiles("s3n://bucketname/*/*/*/*/*') 

しかし、問題は、それが一度にすべてをロードしておかしくなり、おそらくので、今、私はのOutOfMemoryエラーが出ていることです。そう、例えば、一日のサブディレクトリレベルに

移動し、中のものをお読みください。

理想的には、私が行うことができるだろうか、このです

最初に2016/12/01、次に2016/12/02、2012/12/31まで、次に2017/01/01、2017/01/02、2017/01/02、... 2017/01/31など。

こうして、上記のように5つのワイルドカード(*)を使用する代わりに、私はどうにかして "day"のレベルで各サブディレクトリを調べることを知っています。

私はそれぞれの日のファイルパスを指定するためにPython辞書を使用することを考えましたが、それはやや面倒なアプローチのようです。

file_dict = { 
    0:'2016/12/01/*/*', 
    1:'2016/12/02/*/*', 
    ... 
    30:'2016/12/31/*/*', 
} 

基本的に、すべてのフォルダ、およびそれらを反復処理し、このようなものを使用して、それらをロードするために:私はそれの意味することは、次の通りである

sc.wholeTextFiles('s3n://bucketname/' + file_dict[i]) 

しかし、私はしたくありませんすべてのパスを手動で入力してください。質問をする別の方法は、私はバッチ方法で、ネストされたサブディレクトリ構造からファイルを読みますか、ある

:私はこれが意味をなさを願って...

EDIT?どのように私はs3のバケツで可能なすべてのフォルダ名をpythonで列挙できますか?ちょうどそれのいずれか、

{json object 1}, 
{json object 2}, 
{json object 3}, 
... 
{json object n}, 

それが「真JSON」であるためには、次のように

私の各ファイル内のデータの構造は次のとおりです。たぶんそれは...

EDIT2役立つだろうこのような後端にカンマ、または何かせずに上記のようにするために必要な(角括弧に注意して、最終的な末尾のコンマの欠如:

[ 
    {json object 1}, 
    {json object 2}, 
    {json object 3}, 
    ... 
    {json object n} 
] 

私のように完全にPySparkでそれをやった理由私が提出したスクリプトは、この書式設定の不具合を手動で処理することを余儀なくされたからです。私がHive/Athenaを使用する場合、私はそれに対処する方法がわかりません。

答えて

1

Hive、さらにはAthenaを使用しない理由は何ですか?これらは、ファイルシステムのテーブルトップを展開して、すべてのデータにアクセスできるようにします。そして、あなたはまた

スパークし、この中に取り込むことができ、私はあなたにも、あなたのファイルシステムの場所の岩下tempTableを設定するにはスパークでHiveQLを使用することができると信じて、それはあなたが実行できるHiveテーブルとしてそれをすべて登録しますSQLに対して私はそれをしてからしばらくしていますが、それは間違いなく可能です。

+0

私はそれを調べます。 HDFSが動作するのと同じ方法でs3をファイルシステムとして扱うことができるかどうかわからなかったので、私は確信していませんでしたが、HiveやAthenaで推測していますが、バケット内のすべてにアクセスできるはずです。 。それをチェックします。アイデアをありがとう。 – shishy

+0

S3 infactは、HDFSとしてEMRに直接さらされます。それはEMRFSと呼ばれるものです。 hadoopコマンドを実行しているときに、 'hadoop fs -copyToLocal s3://bucket/file.txt。/'を実行することができます(構文はかなり間違っているかもしれません)。 – Henry

+0

正直言って、私のハイブの経験はかなり初歩的です私のデータは基本的に次のように構成されているので、これを行う方法がわかりません。各ファイルには複数のjsonオブジェクト(つぶやき)があります。しかし、各jsonの書式設定は完璧ではありません。マルチjson(行区切り)では、通常は最後にコンマが表示されません。ちょうど改行。しかし、私の場合、各ファイルの構造はOPに追加する新しい編集で説明したとおりです。 – shishy

関連する問題