2016-07-14 16 views
1

私はJiraの変更ログの履歴データを処理しています。大量のデータと、処理時間のほとんどがI/Oベースであるため、非同期的なアプローチがうまくいくと考えました。concurrent.futures.ThreadPoolExecutorでデッドロックを起こさずにロックを使用する方法?

私はjira-python APIを介して要求を行う機能に供給していますすべてのissue_id年代、のリストを持って、dictに情報を抽出し、DictWriterに渡されて、それを書き出します。それをスレッドセーフにするために、私はthreadingモジュールからLock()をインポートしました。これも渡しています。テストでは、ある時点でデッドロックが発生してハングするだけです。私は、タスクがお互いに依存していると、ハングアップする可能性があると書かれていました。これを防ぐ方法を教えてください。ここで

は、参考のために私のコードです:

(この時点では、コード内のリストは、すべてのISSUE_ID年代にkeysが呼ばれている)

def write_issue_history(
     jira_instance: JIRA, 
     issue_id: str, 
     writer: DictWriter, 
     lock: Lock): 
    logging.debug('Now processing data for issue {}'.format(issue_id)) 
    changelog = jira_instance.issue(issue_id, expand='changelog').changelog 

    for history in changelog.histories: 
     created = history.created 
     for item in history.items: 
      to_write = dict(issue_id=issue_id) 
      to_write['date'] = created 
      to_write['field'] = item.field 
      to_write['changed_from'] = item.fromString 
      to_write['changed_to'] = item.toString 
      clean_data(to_write) 
      add_etl_fields(to_write) 
      print(to_write) 
      with lock: 
       print('Lock obtained') 
       writer.writerow(to_write) 

if __name__ == '__main__': 
    with open('outfile.txt', 'w') as outf: 
       writer = DictWriter(
        f=outf, 
        fieldnames=fieldnames, 
        delimiter='|', 
        extrasaction='ignore' 
       ) 
       writer_lock = Lock() 
       with ThreadPoolExecutor(max_workers=5) as exec: 
        for key in keys[:5]: 
         exec.submit(
          write_issue_history, 
          j, 
          key, 
          writer, 
          writer_lock 
         ) 

EDIT:それは私がされていますまた、非常に可能ですJira APIによって抑制されます。

答えて

0

futsという名前のリストにexecの結果を格納し、その結果を得るためにそのリストをループして、結果を得る必要があります。

(つまり、より多くの従来だと、私はexecutorexecをもチャンスだろうと、それは内蔵の上書き回避)

from traceback import print_exc 

... 

with ThreadPoolExecutor(max_workers=5) as executor: 
    futs = [] 
    for key in keys[:5]: 
     futs.append(executor.submit(
      write_issue_history, 
      j, 
      key, 
      writer, 
      writer_lock) 
     ) 

for fut in futs: 
    try: 
     fut.result() 
    except Exception as e: 
     print_exc() 
関連する問題