私は顧客株式投資ポートフォリオを扱う取引アプリケーションを設計しました。取引アプリケーションのCPUを削減するためのタスクの最適化
私は2つのデータストアの種類使用しています:
- 株式を - ユニークな株式名とその日々の変化%が含まれています。
- UserTransactions - ユーザーが作成した在庫の特定の購入に関する情報が含まれています。購入の価値と、現在の購入の在庫の参照。
db.Modelのpythonモジュール:
class Stocks (db.Model):
stockname = db.StringProperty(multiline=True)
dailyPercentChange=db.FloatProperty(default=1.0)
class UserTransactions (db.Model):
buyer = db.UserProperty()
value=db.FloatProperty()
stockref = db.ReferenceProperty(Stocks)
私は、データベースを更新する必要が1時間に1回:Stocks
での日々の変化率を更新し、その参照UserTransactions
内のすべてのエンティティの値を更新株式。
すべての株式に比べて次のPythonモジュールの反復処理し、dailyPercentChangeプロパティを更新し、株式を参照するすべてのUserTransactions
エンティティを越えるとその値を更新するタスクを起動します。
Stocks.py
# Iterate over all stocks in datastore
for stock in Stocks.all():
# update daily percent change in datastore
db.run_in_transaction(updateStockTxn, stock.key())
# create a task to update all user transactions entities referring to this stock
taskqueue.add(url='/task', params={'stock_key': str(stock.key(), 'value' : self.request.get ('some_val_for_stock') })
def updateStockTxn(stock_key):
#fetch the stock again - necessary to avoid concurrency updates
stock = db.get(stock_key)
stock.dailyPercentChange= data.get('some_val_for_stock') # I get this value from outside
... some more calculations here ...
stock.put()
Task.py(/タスク)
# Amount of transaction per task
amountPerCall=10
stock=db.get(self.request.get("stock_key"))
# Get all user transactions which point to current stock
user_transaction_query=stock.usertransactions_set
cursor=self.request.get("cursor")
if cursor:
user_transaction_query.with_cursor(cursor)
# Spawn another task if more than 10 transactions are in datastore
transactions = user_transaction_query.fetch(amountPerCall)
if len(transactions)==amountPerCall:
taskqueue.add(url='/task', params={'stock_key': str(stock.key(), 'value' : self.request.get ('some_val_for_stock'), 'cursor': user_transaction_query.cursor() })
# Iterate over all transaction pointing to stock and update their value
for transaction in transactions:
db.run_in_transaction(updateUserTransactionTxn, transaction.key())
def updateUserTransactionTxn(transaction_key):
#fetch the transaction again - necessary to avoid concurrency updates
transaction = db.get(transaction_key)
transaction.value= transaction.value* self.request.get ('some_val_for_stock')
db.put(transaction)
問題:
現在のシステムは素晴らしい作品が、問題はそれがうまくスケーリングされていないということです...私は300回のユーザトランザクションで約100株を持っている、と私は更新ごとに時間を実行します。ダッシュボードでは、task.pyがCPUの約65%を占めていることがわかりました(Stock.pyは約20%-30%を占めています)、私はアプリケーションエンジンから私に与えられた6.5の無料CPU時間のほぼすべてを使用しています。課金を有効にしてCPUを増やすことは問題ありませんが、問題はシステムのスケーリングです... 100株に対して6.5 CPU時間を使用することは非常に貧弱です。
ここに示したものよりも、より優れた効率的な実装(または現在の実装に役立つ小さな変更)があれば、上記のようなシステムの要件を考えると、私は不思議に思っていました。
ありがとうございます!
ジョエル
100株のCPU時間が6.5時間は奇妙に聞こえますが、あなたはタスクキューをフォークするのではないと確信していますか? http://blog.notdot.net/2010/03/Task-Queue-task-chaining-done-right – systempuntoout
@systempuntoout - ログ内のそのページから200コードしか得られないので、フォーク爆弾ではないようですあなたがコードから見ることができるように、すべてのループは有限です):/ – Joel
ログを見て、キュー上のタスクの数を監視します。 * taskqueue.add *の後に例外がある可能性があります。 – systempuntoout