2016-07-26 9 views
0

私はデータベースからシングルレコード(行)を取得する必要があるセルリーワーカーの束を持っています。Rethinkdb - 特定の条件のアトミック検索/更新行

"last_used"フィールドから60秒が経過した場合にのみ返され、返されるとすぐに "last_used"が現在の時間で更新されるはずです(遅延が0で無制限に再試行する他のワーカーは、同じもの)

これをすべてdbレベルで行うことはできますか?私はこの "last_used"フィールドを更新する他のソースを持っていません。

ここでは、単一行の外観を示します。

{"id": "..." "last_used": "2016-06-31 00:37:21.241833", "item": "string"} 

私が試した:

conn = r.connect(host='localhost', port=28015) 
do = r.db('database').table('tb').order_by(index=("last_used")).limit(1).update(
{'last_used': r.now()} 
, return_changes=True).run(conn) 

をそして、それは動作しません、それが変更される前に、複数の労働者が同じ行を返されます。

答えて

0

私はそれをやったと思いますが、50人の争いでそれに対して50人の労働者を挑戦しました。しかし、私は、私はこれを期待しborrowed from here.

@app.task(bind=True, default_retry_delay=0, max_retries=999) 
def ss(self):   
    conn = r.connect(host='localhost', port=28015) 
    do = r.db('').table('').order_by("last_used").filter(
    r.now() - r.row['last_used'] > 10 
    ).filter({'status': 1}).limit(1).update(
    r.branch(r.row["status"] == 1, {'status': 2, "last_used": r.now()}, {}), 
    return_changes=True).run(conn) 


    try: 
     got_item = do['changes'][0]['new_val']['id'] 
     last_used = do['changes'][0]['new_val']['last_used'] 
    except: 
     # print('error', do) 
     raise self.retry() 

    if got_item: 
     # Do stuff that required unique row here.... 
     print(got_item,'\n',last_used) 
     time.sleep(random.randrange(1,5)) 
     r.db('').table('').get(got_item).update({"status": 1}).run(conn) 

#start workers 
for i in range(50): 
    ss.delay() 

、まだrethinkdbと初心者ですし、この上の自分の考えを聞くのが大好きだ、労働者は「もしgot_item」ブロック内でユニークなアイテムでの動作が保証されています。