私はappengine-mapreduce関数を使っていて、私の目的に合わせてデモを修正しました。 基本的に私は、userid、time1、time2という形式で100万以上の行を持っています。私の目的は、各ユーザーIDのtime1とtime2の違いを見つけることです。私はGoogle App Engineの上でこれを実行するようappengine-mapreduceでメモリリミットを打つ
はしかし、私は、ログのセクションでこのエラーメッセージが発生しました:
はこの要求を処理しながら、合計130個の要求 をサービスした後、180.56メガバイト柔らかいプライベートメモリの制限を超えて、この要求を処理したプロセスが多すぎるメモリを使用していることが判明し、終了しました。これにより、アプリケーションへの次の要求に新しいプロセスが使用される可能性があります。このメッセージが頻繁に表示される場合は、アプリケーションでメモリリークが発生する可能性があります。
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
は、誰もが、私はより良い私のコードを最適化することができますどのように他示唆することはできますか?ありがとう!!
編集:
ここではパイプラインハンドラがあります:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
ファイルの残りの部分は、正確にデモと同じです。
私はDropboxの上の私のコードのコピーをアップロードしました:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
mapreduceの設定を表示できますか?何らかの理由で、ファイル全体をマッパーに渡しているように見えます。ラインごとにマッピングするのではなく、 –
こんにちはDaniel、私の質問は編集されました。ありがとう、本当にそれを感謝します! – autumngard