2012-02-12 7 views
8

私はappengine-mapreduce関数を使っていて、私の目的に合わせてデモを修正しました。 基本的に私は、userid、time1、time2という形式で100万以上の行を持っています。私の目的は、各ユーザーIDのtime1とtime2の違いを見つけることです。私はGoogle App Engineの上でこれを実行するようappengine-mapreduceでメモリリミットを打つ

はしかし、私は、ログのセクションでこのエラーメッセージが発生しました:

はこの要求を処理しながら、合計130個の要求 をサービスした後、180.56メガバイト柔らかいプライベートメモリの制限を超えて、この要求を処理したプロセスが多すぎるメモリを使用していることが判明し、終了しました。これにより、アプリケーションへの次の要求に新しいプロセスが使用される可能性があります。このメッセージが頻繁に表示される場合は、アプリケーションでメモリリークが発生する可能性があります。

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

は、誰もが、私はより良い私のコードを最適化することができますどのように他示唆することはできますか?ありがとう!!

編集:

ここではパイプラインハンドラがあります:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

ファイルの残りの部分は、正確にデモと同じです。

私はDropboxの上の私のコードのコピーをアップロードしました:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

mapreduceの設定を表示できますか?何らかの理由で、ファイル全体をマッパーに渡しているように見えます。ラインごとにマッピングするのではなく、 –

+0

こんにちはDaniel、私の質問は編集されました。ありがとう、本当にそれを感謝します! – autumngard

答えて

2

それはあなたの入力ファイルのサイズは、ソフトメモリ制限を超える可能性があります。大きなファイルの場合は、BlobstoreLineInputReaderまたはBlobstoreZipLineInputReaderのいずれかを使用します。

これらの入力リーダーは、map関数と異なる何かを渡すと、ファイルとテキスト行にstart_positionを渡します。

のように見えるかもしれませんあなたのmap機能:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

ジョブがはるかに速く、それは256までの複数のシャードを、使用することができますように実行できるようになりますBlobstoreLineInputReaderを使用して、それはあなたがあなたをアップロードする必要があることを意味圧縮されていないファイルは痛みを伴う可能性があります。アップストリームの帯域幅が非常に大きいので、圧縮ファイルをEC2 Windowsサーバにアップロードしてから処理し、そこから解凍してアップロードします。

+0

これは私のために非常にうまくいった!どうもありがとう! :) – autumngard

6

また、コード中に通常のポイントでgc.collect()を呼び出すことを検討してください。私は、blobstoreとほとんど関係があり、gc.collect()を呼び出すことによって緩和されたソフトメモリの制限を超えていることに関するいくつかのSOの質問を見てきました。

+0

がgc.collect()を呼び出しているのはブロブストアにのみ適用されますか? – marcadian

関連する問題