PostgreSQLのデッドロックの問題を私は理解していません。
私は、Python、psycopg2モジュール、およびPostgresデータベースを使用して、ラウンドロビンのようなアルゴリズムを実装しようとしています。非常に短い間隔
のためにタスクのリストで、テーブル全体をロック - - (いくつかの制限で、少なくとも最近実行されるタスク)を実行するために作業を選び
が :
私は、次の操作を実行するためのアプリケーションの複数のインスタンスをしたいです - ラベルタスクので、他のインスタンスは、それを選択しないでください(インスタンスは1つだけ同時に同じタスクを実行することが許可されている)
- 実行タスク
- -
他のセッションもすることができるはず繰り返しテーブル
は、ロックを解除このテーブルの特定のフィールドを更新します。
突然、説明できないデッドロックが発生しています。私はPythonスクリプトをできるだけ単純化しました。可能な限り、すべてのステートメントの後でCommitを実行していますが、今でもデッドロックが発生しています。
何らかの理由で、デッドロックが発生するたびに、トランザクションの最初のステートメントになります。それはどのように可能ですか?私のテーブルには、トリガーや外部キーの制約、または複雑になるものはありません。私が思いつくことができる唯一の説明は、PostgreSQLがコミットの直後にロックを解放しないということです。それとも、私が期待しているように動作していないpsycopg2ですか?異なるセッションでステートメントを手動で実行することで問題を再現できませんでした。
デッドロックはまれですが、私はPostgreSQLの9.6.1とPythonでここPostgreSQLの予期しないデッドロック(psycopg2を使用中)
2.7.12は、私が実行コード(これは単なる簡略化されている実行しています
数時間ごとに少なくとも一度はそれらを得ますかサンプルIは)問題をキャッチするために作られた:そのティムで
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT: See server log for query details.
CONTEXT: while updating tuple (8,12) in relation "wf_task"
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
task_struct=get_task(instance_id)
File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT: See server log for query details.
CONTEXT: while locking tuple (17,9) in relation “wf_task"
:
import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib
instance_type='scan_master'
instance_id=sys.argv[1]
dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])
def sanitize(string):
string=string.replace("'","''")
return string
def get_task(instance_id):
task_id=None
out_struct={}
instance_id=sanitize(instance_id)
#Lock whole table
dbh.commit() #Just in case
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
#Now get the task
sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
sql+="FROM wf_task t\n"
sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
sql+="WHERE status='A'\n"
sql+="AND t.scanner_instance_id is NULL\n"
sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
sql+="ORDER BY last_scan_ts\n"
sql+="LIMIT 1\n"
cursor.execute(sql)
cnt=cursor.rowcount
if cnt>0:
row=cursor.fetchone()
task_id=row[0]
sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
cursor.execute(sql)
scanner_function=row[1]
parallel_runs=row[2]
out_struct['task_id']=task_id
out_struct['scanner_function']=scanner_function
out_struct['parallel_runs']=parallel_runs
dbh.commit()
return out_struct
def process_task(task_id):
sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
while True:
if not ovs_lib.check_control(instance_type, instance_id):
now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
print now_time+" Stop sygnal received"
exit(0)
task_struct=get_task(instance_id)
if 'task_id' not in task_struct:
time.sleep(1)
continue
process_task(task_struct['task_id'])
そして、ここでは、私が取得エラーの例です。 eこのスクリプトの6つのインスタンスを同時に実行していました データベース内で他のセッションはアクティブではありませんでした。
その後更新
今日、私はそのバージョン9.5を起動し、この質問
に非常に関連し、PostgreSQLは、私は自分のアプリケーションを設計しようとしていた問題を解決している、SKIP LOCKED文をサポートしていることはPostgresについて何か新しいことを学びました周り、およびキューまたはラウンドロビン・ソリューションのいくつかの並べ替えを実装しようとしているときに
あなたはPostgreSQLでは同時実行で苦労している場合は非常にエレガントな方法で、あなたは絶対にこれを読む必要があります。
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
をアンが、クエリに値を連結した文字列は、本当に悪い習慣です。特に[psycopgには優れたプレースホルダのサポートがあるため](http://initd.org/psycopg/docs/usage.html#passing-parameters-to-sql-queries)。ドキュメントを引用するには: "警告しないでください。決してPythonの文字列連結(+)や文字列パラメータ補間(%)を使用して、SQLクエリ文字列に変数を渡してはいけません。 –
[Here](http://dba.stackexchange.com/questions/68388/optimizing-concurrent-updates-in-postgres)および[ここ](http://stackoverflow.com/questions/12825663/can-two- select-for-update-statements-on-the-table-cause-a-deadlock)を使用すると、SELECT FOR UPDATEの任意の行順序に起因するデッドロックを明示的なORDER BYが回避すべきであることが示されます。また、テーブル全体をロックするのではなく、関連する行のみをロックすることも可能です。 'WHERE scanner_instance_id = instance_id'? –