2017-02-23 8 views
0

私はGAE上で開発したサービスを持っています。アプリケーションは、計算の束を実行するために3秒ごとに「目盛りをつける」必要があります。これはシミュレーション型のゲームです。Google App Engineのタスクキューに重複したタスク(またはそれらを扱うタスク)を避ける

私は(明確にするため取り外しなどを扱ういくつかのエラー、)のようにdeferred APIとタスクキューを使用して、私は開始手動スケールのインスタンスを持っている:

@app.route('/_ah/start') 
def start(): 
    log.info('Ticker instance started') 
    return tick() 

@app.route('/tick') 
def tick(): 
    _do_tick() 
    deferred.defer(tick, _countdown=3) 
return 'Tick!', 200 

問題は時々私はで終わるということですこれは何らかの理由で二度スケジュールされています(一時的なエラー/タイムアウトによってタスクが再スケジュールされる可能性があります)、タスクキューに複数のタスクがあり、ゲームは3秒間に複数回ティッキングされます。

これにはどのように対処するのがよいでしょうか?

私が見る限り、キューに「Xのタスクはありますか?」と聞くことはできません。現在キューにあるアイテムの数はいくつですか?

私はこれをプッシュキューとして使用することを理解しています.1つの考え方はプルキューに切り替え、キューからアイテムをリースすることです。タグを付けてグループ化します。それは良いでしょうか?

私が本当に望むのは、3秒ごとに何かをスケジューリングするためのcronのようなスケジューラだけですが、GAEのスケジューラがその解決に向かない可能性があります。

私はちょうど例えば、起動ハンドラにすべてを移動することができます:私はこれを行うと、その単一の要求であることを認識されているよう

@app.route('/_ah/start') 
def start(): 
    log.info('Ticker instance started') 
while True: 
     _do_tick() 
     sleep(3) 

return 200 

しかし、私が見たものからは、ログは、更新されません決して完了しません。これにより、何が起きているのかをログで確認するのが少し難しくなります。現在、個々のティックは別々のリクエストログエントリとして表示されます。

また、上記が殺された場合は、とにかく自分自身のスケジュールを変更する必要があります。インスタンスがシャットダウンしようとしているときに例外をキャッチできる例外があることがわかっているので、あまり面倒ではないかもしれませんし、遅延タスクを再開して再開することもできます。

GAEでこれを処理する方がいいですか?

+0

3秒ごとに何かするのは非常に難しいようです。必要に応じて、必要に応じてすべての計算を行うことができますか?たとえば、ユーザーからのリクエストを受け取ったら、現在の時間に基づいて必要な計算をすべて実行し、その結果をユーザーに提示しますか? –

+0

悲しいかな、これは連続走行シミュレーションです。これは、データベース内の多数のデータを更新していて、同時に数〜400のゲームクライアントが同時にアクセスしています。 –

答えて

0

https://stackoverflow.com/a/36621588/4495081で言及されているように、_retry_optionsオプションの引数にtask_retry_limitの値を0に設定できます。

障害の正当な理由が存在する場合、ティッキングジョブは永久に停止します。また、ジョブが実行された最後の時間を追跡し、定期的にそのカチカチがまだ実行されているチェックしていない場合は、それを再起動するにcronベースの健全性チェックの仕事を持っている場合があります。

+0

私は、ジョブが最後に実行された時間を保存し、3秒前に来るジョブをスキップしようとしていると思っていましたが、その上の簿記はおそらく高すぎます。私はすでにDBコールの数を抑えようとしています。 –

+0

必要がある場合 - データストアの代わりにmemcache(無料)を使用することを検討してください。memcacheデータが消失した場合の復旧の準備をしてください。 –

+0

私はすでにmemcacheを使用しています。 'tick'プロセスはデータベースのクエリと計算を行い、結果はmemcacheにキャッシュされます。その後、サービスにアクセスしているクライアントは、memcache(ヒット)かdb(miss)から再計算されたデータを取得します。 –

1

重複を検出/排除する方法はありませんが、別のメカニズムを使用して回避しました。むしろスケジューラとしてタスクキューに頼るよりも、私は手動でスケールのインスタンスで私自身のスケジューラ・ループを実行します。

TICKINTERVAL = 3 

@app.route('/_ah/start') 
def scheduler(): 
    log.info('Ticker instance started') 
    while True: 
     if game.is_running(): 
      task = taskqueue.add(
       url='/v1/game/tick', 
       queue_name='tickqueue', 
       method='PUT', 
       target='tickworker', 
       ) 
     else: 
      log.info('Tick skipped as game stopped') 
     db_session.rollback() 
     sleep(TICKINTERVAL) 

私はキューがdoesnのqueue.yaml

queue: 
- name: tickqueue 
    rate: 5/s 
    max_concurrent_requests: 1 
    retry_parameters: 
    task_retry_limit: 0 
    task_age_limit: 1m 

に私自身のキュー、tickqueueを定義していますタスクを再試行し、1分以上そこに残ったタスクはキャンセルされます。私は最大並行性を1に設定して、一度に1つの項目を処理する試みのみを行っています。

時には 'tick'が3秒以上かかると、キューにバックアップされますが、再度速度が上がるとキューはクリアされます。ダニが平均して3秒以上かかると、1分以上キューに入っていたタスクは破棄されます。

これは、各ダニのログエントリを取得するという利点があります(/_ah/deferredとは対照的に、/v1/game/tickと呼ばれます)。欠点は、スケジューラインスタンスの処理要求を/_ah/startが完了するまで実行しないようにすることができないため、スケジューラのインスタンスとワーカーのインスタンスを1つずつ使用する必要があることです。

関連する問題