私はpysparkのs3バケットからデータを読み込んでいます。私は、読み取り操作を並列化し、データを変換する必要があります。しかし、その投げ違いの誤り。以下はコードです。pysparkのs3バケットからデータを取得
s3 = boto3.resource('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key)
bucket = s3.Bucket(bucket)
prefix = 'clickEvent-2017-10-09'
files = bucket.objects.filter(Prefix = prefix)
keys=[k.key for k in files]
pkeys = sc.parallelize(keys)
グローバル変数d
は空のリストです。そして私はこれにdeviceId
データを追加しています。
applying flatMap on the keys
pkeys.flatMap(map_func)
この機能
def map_func(key):
print "in map func"
for line in key.get_contents_as_string().splitlines():
# parse one line of json
content = json.loads(line)
d.append(content['deviceID'])
しかし、上記のコードは私にエラーを与えます。 誰でも手伝ってください!
私はこれで完全に素朴です。だから私を救う! –
私は、s3に複数のjsonファイルを格納していて、クリックストリームデータを持っています(毎日複数のファイルがある10日間のデータファイルを調べなければなりません)。私はこれらのファイルを読んで、いくつかの条件を満たす行だけを保存する必要があるので、すべての行を読んでチェックを行う必要があります。読み込み操作をキーに並列化すると、すべてのファイルの結果を照合することが可能になります。 –
一般的なワークフローは、各イベントをDataFrameの行としてロードするためにspark.read.jsonになります。次に、SQLのような操作を使用してDataFrameを操作できます。あなたのケースでは、あなたが気にする行を選択するフィルタメソッド。あなたのケースに必要な他の変換を行い、DF.writeまたはcollectを使用して結果をどこかに保存します。 – RyanW