私はJiraの変更ログの履歴データを処理しています。大量のデータと、処理時間のほとんどがI/Oベースであるため、非同期的なアプローチがうまくいくと考えました。concurrent.futures.ThreadPoolExecutorでデッドロックを起こさずにロックを使用する方法?
私はjira-python
APIを介して要求を行う機能に供給していますすべてのissue_id
年代、のリストを持って、dict
に情報を抽出し、DictWriter
に渡されて、それを書き出します。それをスレッドセーフにするために、私はthreading
モジュールからLock()
をインポートしました。これも渡しています。テストでは、ある時点でデッドロックが発生してハングするだけです。私は、タスクがお互いに依存していると、ハングアップする可能性があると書かれていました。これを防ぐ方法を教えてください。ここで
は、参考のために私のコードです:
(この時点では、コード内のリストは、すべてのISSUE_ID年代にkeys
が呼ばれている)
def write_issue_history(
jira_instance: JIRA,
issue_id: str,
writer: DictWriter,
lock: Lock):
logging.debug('Now processing data for issue {}'.format(issue_id))
changelog = jira_instance.issue(issue_id, expand='changelog').changelog
for history in changelog.histories:
created = history.created
for item in history.items:
to_write = dict(issue_id=issue_id)
to_write['date'] = created
to_write['field'] = item.field
to_write['changed_from'] = item.fromString
to_write['changed_to'] = item.toString
clean_data(to_write)
add_etl_fields(to_write)
print(to_write)
with lock:
print('Lock obtained')
writer.writerow(to_write)
if __name__ == '__main__':
with open('outfile.txt', 'w') as outf:
writer = DictWriter(
f=outf,
fieldnames=fieldnames,
delimiter='|',
extrasaction='ignore'
)
writer_lock = Lock()
with ThreadPoolExecutor(max_workers=5) as exec:
for key in keys[:5]:
exec.submit(
write_issue_history,
j,
key,
writer,
writer_lock
)
EDIT:それは私がされていますまた、非常に可能ですJira APIによって抑制されます。