私は現在、appengine上で動作するアプリケーションを持っていますが、遅延ライブラリを使用していくつかのジョブを実行しています。これらのタスクのほとんどは、Datastoreにクエリを実行してドキュメントを取得し、インデックスにエンティティを格納します(Search API)。これらのテーブルのいくつかは毎月置き換えられ、これらのタスクをすべてのエンティティ(4〜5M)で実行する必要があります。そのようなタスクのGoogle appengine:タスクキューのパフォーマンス
一つexempleは次のとおりです。
def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
#get index
BATCH_SIZE = 200
cps, next_cursor, more = Company.query().\
fetch_page(BATCH_SIZE,
start_cursor=cursor)
doc_list = []
for i in range(0, len(cps)):
cp = cps[i]
#create a Index Document using the Datastore entity
#this document has only about 5 text fields and one date field
cp_doc = getCompanyDocument(cp)
doc_list.append(cp_doc)
index = search.Index(name='Company')
index.put(doc_list)
n_entities += len(doc_list)
if more:
logging.debug('Company: %d added to index', n_entities)
#to_put[:] = []
doc_list[:] = []
deferred.defer(addCompaniesToIndex,
cursor=next_cursor,
n_entities=n_entities,
mindate=mindate)
else:
logging.debug('Finished Company index creation (%d processed)', n_entities)
私は1つのタスクだけを実行すると、実行はので、私の5Mエンティティは約35時間かかりますインデクシング、延期タスクごとに4-5sの周りになります。
別のことは、同じキューで別の遅延タスクを使用する別のインデックス(たとえば、毎日の更新プログラムの1つ)で更新を実行すると、どちらも非常に遅く実行されることです。そして、遅延コール1回につき約10-15秒かかり始めます。これは耐え難いことです。
私の質問は、これを高速に実行し、毎回複数のジョブにプッシュキューをスケーリングする方法があるかどうかです。あるいは、私はこの問題に対して別のアプローチを使うべきですか?事前に
おかげで、あなたは実質的にタスクの実行をシリアル化しているaddCompaniesToIndex()
関数の最後にif more
ステートメントを配置することにより
ハイダン、あなたのアイデアを自分のコードに適用しましたが、データストアからの読み込みが実際にはインデックスにこれらのエンティティを挿入するよりも高価であるため、スループットの向上は期待したほど大きくはありません私は、読み取り操作のサイズを最小限に抑えることが役立つかもしれないと思う、いくつかのテストを行い、あなたに返す。 – Clds
そのような場合は、代わりにkeys_onlyクエリを実行し、次のタスクをエンキューした後、ページのキーのリストをアセンブルし、ドキュメントを取得してインデックスを更新するためのバッチリードを実行します。 –
BTW - データストアの読み込みコストについて疑問を実際に確認することができます:デベロッパーコンソールでアプリのログを確認します - ログエントリのいくつかは要求期間列に青いリンクがあります - リンクをクリックしてから "トレースを表示"ポップアップメニューではStackDriverにappstatsのようなトレースが表示されるので、その特定のリクエストにどこの時間が費やされたかを知ることができます。 –