2017-09-25 10 views
0

gzipファイルを分散してダウンロードしたいと思います。私はすべてのファイルのURLを含むリストを作成し、sparkを使用してそれらを並列化しました。このrddの地図を使って、私は現在のファイルをダウンロードしました。それから私はそれを私のhdfsに保存して、それを再オープンし、botoライブラリを使ってamazones3で再保存したいと思います。関数にgzipファイルを保存するにはrddに適用する

例として、これは私のコードですが、ファイルをダウンロードしてhdfsディレクトリに保存しようとしましたが、パスからエラーが発生しました。

try: 
    # For Python 3.0 and later 
    from urllib.request import urlopen 
except ImportError: 
    # Fall back to Python 2's urllib2 
    from urllib2 import urlopen 

import StringIO 
import gzip 
from gzip import GzipFile 


def dowload_and_save(x): 
    response = urlopen(x) 

    compressedFile = StringIO.StringIO() 
    compressedFile.write(response.read()) 

    compressedFile.seek(0) 

    decompressedFile = gzip.GzipFile(fileobj=compressedFile, mode='rb') 
    with open('http://localhost:50070/webhdfs/user/root/ruben', 'w') as outfile: 
     outfile.write(decompressedFile.read()) 



url_lists=['https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-190000.gz','https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-200000.gz'] 

url_lists_rdd=sc.parallelize(url_lists) 

url_lists_rdd.map(dowload_and_save) 

答えて

0

は私が解決策を見つけた

import boto 
from boto.s3.key import Key 
import requests 
import os 
os.environ['S3_USE_SIGV4'] = 'True' 

def dowload_and_save(x): 

    bucket_name='magnet-fwm' 
    k = Key(bucket_name) 

    access_key='' 
    secret='' 

    r = requests.get(x) 
    #return (r.content) 

    c = boto.connect_s3(access_key, secret, host='s3-eu-west-1.amazonaws.com') 
    b = c.get_bucket(bucket_name,validate=False) 

    if r.status_code == 200: 
    #upload the file 
     k = Key(b) 
     k.key = "file.gz" 

     k.content_type = r.headers['content-type'] 
     k.set_contents_from_string(r.content) 
    return 'a' 



list=['https://dumps.wikimedia.org/other/pagecounts-raw/2007/2007-12/pagecounts-20071209-180000.gz','https://dumps.wikimedia.org/other/pagecounts-raw/2008/2008-01/pagecounts-20080101-050000.gz'] 

url_lists_rdd=sc.parallelize(list) 



#url_lists_rdd.map(lambda x: dowload_and_save(x,access_key,secret,bucket_name)) 
a=url_lists_rdd.map(dowload_and_save) 
関連する問題