非常に単純なマップ還元パイプラインを実装し、いくつかの問題を積み重ねました。大量のデータ(> 1000000)でGoogle App Engine MapReduceを使用する方法
クラウドデータストアの1つの種類のモデルに1000000を超えるエンティティがあり、各エンティティが不整合プロパティを持っているかどうかをすべて確認したいと考えています。
ここに私のコードスニペットがあります。
class User(ndb.model)
parent = ndb.KeyProperty(Group) # want to check if this key property actually exist
class CheckKeyExistencePipeline(pipeline.Pipeline):
def map(self, entity):
logging.info(entity.urlsafe()) # added for debug
prop = getattr(entity, 'parent')
if not prop.get():
yield 'parent does not exist: %s\n' % (entity.key.urlsafe())
def run(self, modelname, shards):
mapreduce_pipeline.MapperPipeline(
'parent check',
handler_spec='CheckKeyExistencePipeline.map',
input_reader_spec='mapreduce.input_readers.DatastoreInputReader',
output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter",
params={
'input_reader': {
'entity_kind': 'User',
},
'output_writer': {
'bucket_name': app_identity.get_default_gcs_bucket_name(),
'content_type': 'text/plain'
}
},
shards=10)
問題は、実際には次のようなエラーが表示されることです。
整備2つの要求が
を合計した後、私は10000のエンティティの周りのデータでこのコードを実行何ら問題はありません133メガバイトと128メガバイトのソフトプライベートメモリの制限を超えました。 問題とは何ですか。また、大量のデータを適用するためにこのパイプラインを正しく構成するにはどうすればよいですか?
EDIT1
私はNDBキャッシュを使用しないように変更され、それは何の改善も思いません。私はキャッシュがソースコードによれば既定値から外れていると思います。
def _set_ndb_cache_policy():
"""Tell NDB to never cache anything in memcache or in-process.
This ensures that entities fetched from Datastore input_readers via NDB
will not bloat up the request memory size and Datastore Puts will avoid
doing calls to memcache. Without this you get soft memory limit exits,
which hurts overall throughput.
"""
ndb_ctx = ndb.get_context()
ndb_ctx.set_cache_policy(lambda key: False)
ndb_ctx.set_memcache_policy(lambda key: False)
私は問題を見つけるために、さらに調査をしました。私はマッパーパラメータprocessing_rate
を10に設定し、shards
を100に設定して、タスクごとに1または2のエンティティのみを処理するようにしました。
ここにmapreduceの統計情報があります。グラフは合理的です。 (パイプラインがこの時点ではまだ終わっていません。)
しかし、私は労働者のタスクの1のトレースログを確認するとき、それは本当に奇妙です。 'map'関数が(デバッグログに基づいて)2回しか呼び出されないにもかかわらず、/datastore_v3.Next
と/datastore_v3.Get
の束が表示されます。私はbatch_sizeを変更しなかったので、50にする必要があります。したがって、私の理解では、/datastore_v3.Next
は1回のみと呼ばれ、/datastore_v3.Get
が2回呼び出されます。
データベースに、このような多くのRPC呼び出しがトリガーされた理由を誰もが知っていますか?
EDIT2
繰り返しますが、私はさらに調査を行なったし、コードを簡単に作りました。マップ関数は、get
という関数を呼び出して、ndb.Key
を使用してデータを取得するだけです。
class CheckKeyExistencePipeline(pipeline.Pipeline):
def map(self, entity):
logging.info('start')
entity.parent.get()
logging.info('end')
def run(self):
mapreduce_pipeline.MapperPipeline(
'parent check',
handler_spec='CheckKeyExistencePipeline.map',
input_reader_spec='mapreduce.input_readers.DatastoreInputReader',
output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter",
params={
'input_reader': {
'entity_kind': 'User',
},
'output_writer': {
'bucket_name': app_identity.get_default_gcs_bucket_name(),
'content_type': 'text/plain'
}
},
shards=10)
StackdriverのTracelogはこのようなものです。
それはちょうどget
を呼んだが、それは「開始」と「終了」の間のRPCコールを何度もトリガします。これはちょっと変わったようで、このメモリ消費の理由の1つになる可能性があります。この通常の行動ですか?
時々、あなたはndb'が提供する 'ことをインスタンスごとにキャッシュを使用して大量のメモリを埋めることができます。キャッシュを使わずにエンティティをフェッチすることができます: 'prop.get(use_cache = False)'。この場合、同じキーを複数回連続して取得しているわけではないので、キャッシュから何も取得できません。また、[インスタンスクラスを増やす](https: /cloud.google.com/appengine/docs/about-the-standard-environment#instance_classes)これらのリクエストを処理するサービスのために... – mgilson
はい、キャッシュはここで重要なコンポーネントになります(一般にndbエンティティをキャッシュしますあなたがそれらを再利用することを知っていなければ役に立ちません。代わりにイテレータをキーベースにして、キャッシュの動作をよりよく制御できます。 –
ああ私はそれを得た。私はキャッシュについては考えなかった。試してみます。ありがとう! – Kunihiko