2017-01-27 18 views
1

非常に単純なマップ還元パイプラインを実装し、いくつかの問題を積み重ねました。大量のデータ(> 1000000)でGoogle App Engine MapReduceを使用する方法

クラウドデータストアの1つの種類のモデルに1000000を超えるエンティティがあり、各エンティティが不整合プロパティを持っているかどうかをすべて確認したいと考えています。

ここに私のコードスニペットがあります。

class User(ndb.model) 
    parent = ndb.KeyProperty(Group) # want to check if this key property actually exist 


class CheckKeyExistencePipeline(pipeline.Pipeline): 

    def map(self, entity): 
     logging.info(entity.urlsafe()) # added for debug 
     prop = getattr(entity, 'parent') 
     if not prop.get(): 
      yield 'parent does not exist: %s\n' % (entity.key.urlsafe()) 

    def run(self, modelname, shards): 
     mapreduce_pipeline.MapperPipeline(
      'parent check', 
      handler_spec='CheckKeyExistencePipeline.map', 
      input_reader_spec='mapreduce.input_readers.DatastoreInputReader', 
      output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter", 
      params={ 
       'input_reader': { 
        'entity_kind': 'User', 
       }, 
       'output_writer': { 
        'bucket_name': app_identity.get_default_gcs_bucket_name(), 
        'content_type': 'text/plain' 
       } 
      }, 
      shards=10) 

問題は、実際には次のようなエラーが表示されることです。

整備2つの要求が

を合計した後、私は10000のエンティティの周りのデータでこのコードを実行何ら問題はありません133メガバイトと128メガバイトのソフトプライベートメモリの制限を超えました。 問題とは何ですか。また、大量のデータを適用するためにこのパイプラインを正しく構成するにはどうすればよいですか?

EDIT1

私はNDBキャッシュを使用しないように変更され、それは何の改善も思いません。私はキャッシュがソースコードによれば既定値から外れていると思います。

https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/6e103ac52855c3214de3ec3721d6ec0e7edd5f77/python/src/mapreduce/util.py#L381-L383

def _set_ndb_cache_policy(): 
    """Tell NDB to never cache anything in memcache or in-process. 
    This ensures that entities fetched from Datastore input_readers via NDB 
    will not bloat up the request memory size and Datastore Puts will avoid 
    doing calls to memcache. Without this you get soft memory limit exits, 
    which hurts overall throughput. 
    """ 
    ndb_ctx = ndb.get_context() 
    ndb_ctx.set_cache_policy(lambda key: False) 
    ndb_ctx.set_memcache_policy(lambda key: False) 

私は問題を見つけるために、さらに調査をしました。私はマッパーパラメータprocessing_rateを10に設定し、shardsを100に設定して、タスクごとに1または2のエンティティのみを処理するようにしました。

ここにmapreduceの統計情報があります。グラフは合理的です。 (パイプラインがこの時点ではまだ終わっていません。)

mapreduce detail

しかし、私は労働者のタスクの1のトレースログを確認するとき、それは本当に奇妙です。 'map'関数が(デバッグログに基づいて)2回しか呼び出されないにもかかわらず、/datastore_v3.Next/datastore_v3.Getの束が表示されます。私はbatch_sizeを変更しなかったので、50にする必要があります。したがって、私の理解では、/datastore_v3.Nextは1回のみと呼ばれ、/datastore_v3.Getが2回呼び出されます。

tracelog

データベースに、このような多くのRPC呼び出しがトリガーされた理由を誰もが知っていますか?

EDIT2

繰り返しますが、私はさらに調査を行なったし、コードを簡単に作りました。マップ関数は、getという関数を呼び出して、ndb.Keyを使用してデータを取得するだけです。

class CheckKeyExistencePipeline(pipeline.Pipeline): 

    def map(self, entity): 
     logging.info('start') 
     entity.parent.get() 
     logging.info('end') 


    def run(self): 
     mapreduce_pipeline.MapperPipeline(
      'parent check', 
      handler_spec='CheckKeyExistencePipeline.map', 
      input_reader_spec='mapreduce.input_readers.DatastoreInputReader', 
      output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter", 
      params={ 
       'input_reader': { 
        'entity_kind': 'User', 
       }, 
       'output_writer': { 
        'bucket_name': app_identity.get_default_gcs_bucket_name(), 
        'content_type': 'text/plain' 
       } 
      }, 
      shards=10) 

StackdriverのTracelogはこのようなものです。

trace log

それはちょうどgetを呼んだが、それは「開始」と「終了」の間のRPCコールを何度もトリガします。これはちょっと変わったようで、このメモリ消費の理由の1つになる可能性があります。この通常の行動ですか?

+1

時々、あなたはndb'が提供する 'ことをインスタンスごとにキャッシュを使用して大量のメモリを埋めることができます。キャッシュを使わずにエンティティをフェッチすることができます: 'prop.get(use_cache = False)'。この場合、同じキーを複数回連続して取得しているわけではないので、キャッシュから何も取得できません。また、[インスタンスクラスを増やす](https: /cloud.google.com/appengine/docs/about-the-standard-environment#instance_classes)これらのリクエストを処理するサービスのために... – mgilson

+0

はい、キャッシュはここで重要なコンポーネントになります(一般にndbエンティティをキャッシュしますあなたがそれらを再利用することを知っていなければ役に立ちません。代わりにイテレータをキーベースにして、キャッシュの動作をよりよく制御できます。 –

+0

ああ私はそれを得た。私はキャッシュについては考えなかった。試してみます。ありがとう! – Kunihiko

答えて

0

私は最終的に問題が何であるかを知ったと思います。

MapReduceはndb.query.iterを使用し、eventloopsを使用して非同期RPC呼び出しを管理するという点が重要です。私のMapReduce呼び出しでは、MapReduceライブラリによってデータベースレコード(A)を取得するためにトリガされるものと、map関数(B)によって2種類のRPC呼び出しがトリガされます。

私のmap関数内でRPC呼び出しをトリガーしない場合、次のRPC呼び出しをトリガする場所はありません。つまり、次の(A)は、バッチ処理された50レコードが反復された後にのみトリガーされます。しかし、(B)は次のRPC呼び出しをトリガし、RPC呼び出しは連続してトリガされない(FIFOキューではない)ので、(A)はすべてのエンティティをフェッチするまで連続的にトリガされる可能性があります。

shardsを100に設定しましたが、それでも1つのシャードが10000レコードになります。したがって、これはソフトメモリの制限を超えます。

そして、私は10000にshardsを増やす際に、別のエラーが発生した...結論として

は、低メモリインスタンスを持つ大規模なデータでのMapReduceを使用する方法はありません。私は推測する。

詳しくは、下記の問題をご確認ください。

https://code.google.com/p/googleappengine/issues/detail?id=11648 https://code.google.com/p/googleappengine/issues/detail?id=9610(当初発行)

関連する問題