2017-03-11 35 views
2


PostgreSQLのデッドロックの問題を私は理解していません。
私は、Python、psycopg2モジュール、およびPostgresデータベースを使用して、ラウンドロビンのようなアルゴリズムを実装しようとしています。非常に短い間隔
のためにタスクのリストで、テーブル全体をロック - - (いくつかの制限で、少なくとも最近実行されるタスク)を実行するために作業を選び

が :
私は、次の操作を実行するためのアプリケーションの複数のインスタンスをしたいです - ラベルタスクので、他のインスタンスは、それを選択しないでください(インスタンスは1つだけ同時に同じタスクを実行することが許可されている)
- 実行タスク
- -
他のセッションもすることができるはず繰り返しテーブル
は、ロックを解除このテーブルの特定のフィールドを更新します。
突然、説明できないデッドロックが発生しています。私はPythonスクリプトをできるだけ単純化しました。可能な限り、すべてのステートメントの後でCommitを実行していますが、今でもデッドロックが発生しています。
何らかの理由で、デッドロックが発生するたびに、トランザクションの最初のステートメントになります。それはどのように可能ですか?私のテーブルには、トリガーや外部キーの制約、または複雑になるものはありません。私が思いつくことができる唯一の説明は、PostgreSQLがコミットの直後にロックを解放しないということです。それとも、私が期待しているように動作していないpsycopg2ですか?異なるセッションでステートメントを手動で実行することで問題を再現できませんでした。
デッドロックはまれですが、私はPostgreSQLの9.6.1とPythonでここPostgreSQLの予期しないデッドロック(psycopg2を使用中)

2.7.12は、私が実行コード(これは単なる簡略化されている実行しています

数時間ごとに少なくとも一度はそれらを得ますかサンプルIは)問題をキャッチするために作られた:そのティムで

Traceback (most recent call last): 
    File "/opt/workflow/bin/scan_simple.py", line 70, in <module> 
process_task(task_struct['task_id']) 
    File "/opt/workflow/bin/scan_simple.py", line 58, in process_task 
cursor.execute(sql) 
psycopg2.extensions.TransactionRollbackError: deadlock detected 
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425. 
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102. 
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577. 
HINT: See server log for query details. 
CONTEXT: while updating tuple (8,12) in relation "wf_task" 

Traceback (most recent call last): 
    File "/opt/workflow/bin/scan_simple.py", line 66, in <module> 
    task_struct=get_task(instance_id) 
    File "/opt/workflow/bin/scan_simple.py", line 27, in get_task 
    cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") 
psycopg2.extensions.TransactionRollbackError: deadlock detected 
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931. 
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776. 
HINT: See server log for query details. 
CONTEXT: while locking tuple (17,9) in relation “wf_task" 

import psycopg2 
import sys 
import datetime 
import time 
sys.path.append('/opt/workflow/lib') 
import config 
import ovs_lib 


instance_type='scan_master' 
instance_id=sys.argv[1] 

dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass']) 
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False) 
cursor = dbh.cursor() 
cursor.execute("SET search_path TO "+config.values['pgsql']['schema']) 

def sanitize(string): 
    string=string.replace("'","''") 
    return string 

def get_task(instance_id): 
    task_id=None 
    out_struct={} 
    instance_id=sanitize(instance_id) 
    #Lock whole table 
    dbh.commit() #Just in case 
    cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table 
    cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run 
    #Now get the task 
    sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n" 
    sql+="FROM wf_task t\n" 
    sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n" 
    sql+="WHERE status='A'\n" 
    sql+="AND t.scanner_instance_id is NULL\n" 
    sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n" 
    sql+="ORDER BY last_scan_ts\n" 
    sql+="LIMIT 1\n" 
    cursor.execute(sql) 
    cnt=cursor.rowcount 
    if cnt>0: 
    row=cursor.fetchone() 
    task_id=row[0] 
    sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id) 
    cursor.execute(sql) 
    scanner_function=row[1] 
    parallel_runs=row[2] 
    out_struct['task_id']=task_id 
    out_struct['scanner_function']=scanner_function 
    out_struct['parallel_runs']=parallel_runs 
    dbh.commit() 
    return out_struct 

def process_task(task_id): 
    sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()" 
    cursor.execute(sql) 
    dbh.commit() 
    sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()" 
    cursor.execute(sql) 
    dbh.commit() 

while True: 
    if not ovs_lib.check_control(instance_type, instance_id): 
    now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S') 
    print now_time+" Stop sygnal received" 
    exit(0) 
    task_struct=get_task(instance_id) 
    if 'task_id' not in task_struct: 
    time.sleep(1) 
    continue 
    process_task(task_struct['task_id']) 

そして、ここでは、私が取得エラーの例です。 eこのスクリプトの6つのインスタンスを同時に実行していました データベース内で他のセッションはアクティブではありませんでした。

その後更新
今日、私はそのバージョン9.5を起動し、この質問
に非常に関連し、PostgreSQLは、私は自分のアプリケーションを設計しようとしていた問題を解決している、SKIP LOCKED文をサポートしていることはPostgresについて何か新しいことを学びました周り、およびキューまたはラウンドロビン・ソリューションのいくつかの並べ替えを実装しようとしているときに
あなたはPostgreSQLでは同時実行で苦労している場合は非常にエレガントな方法で、あなたは絶対にこれを読む必要があります。
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

+0

をアンが、クエリに値を連結した文字列は、本当に悪い習慣です。特に[psycopgには優れたプレースホルダのサポートがあるため](http://initd.org/psycopg/docs/usage.html#passing-parameters-to-sql-queries)。ドキュメントを引用するには: "警告しないでください。決してPythonの文字列連結(+)や文字列パラメータ補間(%)を使用して、SQLクエリ文字列に変数を渡してはいけません。 –

+0

[Here](http://dba.stackexchange.com/questions/68388/optimizing-concurrent-updates-in-postgres)および[ここ](http://stackoverflow.com/questions/12825663/can-two- select-for-update-statements-on-the-table-cause-a-deadlock)を使用すると、SELECT FOR UPDATEの任意の行順序に起因するデッドロックを明示的なORDER BYが回避すべきであることが示されます。また、テーブル全体をロックするのではなく、関連する行のみをロックすることも可能です。 'WHERE scanner_instance_id = instance_id'? –

答えて

0

問題はおそらく最初のSELECT ... FOR UPDATEの順次スキャンでは常に同じ順序で行が返されるわけではないため、この文を同時に実行すると異なる順序で表の行がロックされる可能性があります。これはあなたが経験するデッドロックにつながります。

増加良さにはいくつかのソリューションがあります:

  • 私はこのアップデートのためにテーブル全体をロックするための技術は、パフォーマンスのために恐ろしいと思いますが、あなたのコードを維持する上で主張すれば、あなたが設定することができますsynchronize_seqscansoffとなり、すべての順次スキャンでは同じ順序で行が戻されます。ただし、テーブル内のすべての行をロックする必要はありません。

    • 不要な順次スキャンが発生します。

    • 安全ではありません。誰かがUPDATEを実行している行と時刻をロックする時間の間に誰かがINSERT新しい行を表示できます。

  • あなたは本当に代わりに、テーブル内のすべての行をロックするのLOCK TABLE文を使用し、テーブル全体をロックします。それもデッドロックを取り除くでしょう。

  • 最良の解決策は、おそらくUPDATEで行をロックすることです。デッドロックを回避するには、PostgreSQLがUPDATEに使用する実行計画を調べます。これは、インデックススキャンまたはシーケンシャルスキャンになります。インデックススキャンでは、特定の順序で行が返されるため安全です。シーケンシャルスキャンの場合は、唯一の取引のために理想的には、上記のsynchronize_seqscans機能を無効にする:脇興味深い質問へ

    START TRANSACTION; 
    SET LOCAL synchronize_seqscans = off; 
    /* your UPDATEs go here */ 
    COMMIT; 
    
+0

ありがとうLaurenz!実際には意味があります SELECT FOR UPDATEの代わりにLOCK TABLEステートメントを使用しようとします。 Postgresがテーブル全体を一度にロックするのではなく、一度に行をロックすることは決してできませんでしたが、現在の動作を説明します。 私はそれをしばらく実行して、デッドロックが表示されるかどうかを確認します – Serge

+0

残念ながら、この場合はテーブル全体をロックするのが最良の方法です。特定のレコードをUPDATEでロックすると、望ましくない状況が発生します。私は更新したいレコードの主キーを知らない。私の更新ステートメントは、既に別のインスタンスによって保持されている可能性があります(そして、すべてのインスタンスが同じ条件のレコードを探しているため)。 – Serge

+0

ロックが解除されると、条件が変わり、それはもはや必要なレコードにはなりません。私はレコードを更新しないで終了します(WHERE句がaferロックが解放されたので満足していないので)。したがって、スクリプトは前回のトランザクションを待って終了し、行を更新しなくなります。 – Serge

関連する問題