2017-11-03 42 views
0

私はpysparkのs3バケットからデータを読み込んでいます。私は、読み取り操作を並列化し、データを変換する必要があります。しかし、その投げ違いの誤り。以下はコードです。pysparkのs3バケットからデータを取得

s3 = boto3.resource('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) 
bucket = s3.Bucket(bucket) 

prefix = 'clickEvent-2017-10-09' 
files = bucket.objects.filter(Prefix = prefix) 
keys=[k.key for k in files] 
pkeys = sc.parallelize(keys) 

グローバル変数dは空のリストです。そして私はこれにdeviceIdデータを追加しています。

applying flatMap on the keys

pkeys.flatMap(map_func) 

この機能

def map_func(key): 
    print "in map func" 
    for line in key.get_contents_as_string().splitlines(): 
    # parse one line of json 
    content = json.loads(line) 
    d.append(content['deviceID']) 

しかし、上記のコードは私にエラーを与えます。 誰でも手伝ってください!

答えて

0

あなたには2つの問題があります。まず、sparkとhadoopに組み込まれた直接S3サポートを使用する代わりに、botoを使用してS3からデータを手動で読み込もうとしています。 1行にjsonレコードを含むテキストファイルを読み込もうとしているようです。その場合は、あなただけのスパークでこれを行うことができます。

df = spark.read.json('s3://my-bucket/path/to/json/files/') 

これは、行として、各ラインでJSONデータを読み込むことにより、あなたのためのスパークデータフレームを作成します。 DataFramesには、JSONデータの一部をサンプリングして決定することを目指す、あらかじめ定義された厳密なスキーマ(リレーショナルデータベーステーブルなど)が必要です。

df.select('deviceID') 

指摘する価値、他の問題は、あなたのスパーククラスタ全体で計算されたデータを保存するために、グローバル変数を使用しようとしているです:あなたはデータフレームを持った後、あなたの列を取得するために必要なすべてがこのようにそれを選択しています。ブロードキャスト変数または暗黙の終了を使用して、ドライバからスパークワーカーで実行されているすべてのエグゼキュータにデータを送信することは可能です。しかし、エグゼキュータからドライバの変数に書き込む方法はありません!エグゼキュータからデータをドライバに転送するには、まさにこの目的のためにsparkのActionメソッドを使う必要があります。

アクションは、sparkに計算結果が欲しいと指示するメソッドです。これは、あなたが話した変換を実行する必要があるためです。あなたのケースでは、おそらくどちらかをしたいと思う:

結果が大きい場合: DATAFRAME:結果が小さい場合、バックS3

をごtranformationsの結果を保存するために使用DataFrame.write。あなたのドライバにそれらをダウンロードして何かをするcollect()

+0

私はこれで完全に素朴です。だから私を救う! –

+0

私は、s3に複数のjsonファイルを格納していて、クリックストリームデータを持っています(毎日複数のファイルがある10日間のデータファイルを調べなければなりません)。私はこれらのファイルを読んで、いくつかの条件を満たす行だけを保存する必要があるので、すべての行を読んでチェックを行う必要があります。読み込み操作をキーに並列化すると、すべてのファイルの結果を照合することが可能になります。 –

+0

一般的なワークフローは、各イベントをDataFrameの行としてロードするためにspark.read.jsonになります。次に、SQLのような操作を使用してDataFrameを操作できます。あなたのケースでは、あなたが気にする行を選択するフィルタメソッド。あなたのケースに必要な他の変換を行い、DF.writeまたはcollectを使用して結果をどこかに保存します。 – RyanW

関連する問題