2016-05-10 9 views
0

スレッドを使ってPythonでプログラムを実行して、タスクを並列化しています。タスクは単純な文字列のマッチングです。長い文字列のデータベースに多数の短い文字列を対応させています。私は並列化しようとしたときに、短い文字列のリストをコアの数に等しい数のサブリストに分割し、別々のコアで別々に実行することに決めました。しかし、5または10コアでタスクを実行すると、1つのコアに比べて約2倍遅くなります。その理由は何でしょうか、それをどうすれば修正できるでしょうか?複数のコアでプログラムを実行する

編集:私のコードは、コードの重要な部分はpeptsで短い配列が遺伝子で長いシーケンスにマッチしますジョブ機能、範囲内にある

import sys 
import os 
import csv 
import re 
import threading 
from Queue import Queue 
from time import sleep 
from threading import Lock 


q_in = Queue() 
q_out = Queue() 
lock = Lock() 

def ceil(nu): 
    if int(nu) == nu: 
     return int(nu) 
    else: 
     return int(nu) + 1 

def opencsv(csvv): 
    with open(csvv) as csvfile: 
     peptides = [] 
     reader = csv.DictReader(csvfile) 
     k = 0 
     lon = "" 
     for row in reader: 
      pept = str(row["Peptide"]) 
      pept = re.sub("\((\+\d+\.\d+)\)", "", pept) 
      peptides.append(pept) 
     return peptides 

def openfasta(fast): 
    with open(fast, "r") as fastafile: 
     dic = {} 
     for line in fastafile: 
      l = line.strip() 
      if l[0] == ">": 
       cur = l 
       dic[l] = "" 
      else: 
       dic[cur] = dic[cur] + l 
     return dic 

def match(text, pattern): 
    text = list(text.upper()) 
    pattern = list(pattern.upper()) 
    ans = [] 
    cur = 0 
    mis = 0 
    i = 0 
    while True: 
     if i == len(text): 
      break 
     if text[i] != pattern[cur]: 
      mis += 1 
      if mis > 1: 
       mis = 0 
       cur = 0 
       continue 
     cur = cur + 1 
     i = i + 1 
     if cur == len(pattern): 
      ans.append(i - len(pattern)) 
      cur = 0 
      mis = 0 
      continue 
    return ans 

def job(pepts, outfile, genes): 
    c = 0 
    it = 0 
    towrite = [] 
    for i in pepts: 
     # if it % 1000 == 0: 
      # with lock: 
       # print float(it)/float(len(pepts)) 
     it = it + 1 
     found = 0 
     for j in genes: 
      m = match(genes[j], i) 
      if len(m) > 0: 
       found = 1 
       remb = m[0] 
       wh = j 
       c = c + len(m) 
       if c > 1: 
        found = 0 
        c = 0 
        break 
     if found == 1: 
      towrite.append("\t".join([i, str(remb), str(wh)]) + "\n") 
    return towrite 


def worker(outfile, genes): 
    s = q_in.qsize() 
    while True: 
     item = q_in.get() 
     print "\r{0:.2f}%".format(1 - float(q_in.qsize())/float(s)) 
     if item is None: 
      break #kill thread 
     pepts = item 
     q_out.put(job(pepts, outfile, genes)) 
     q_in.task_done() 

def main(args): 
    num_worker_threads = int(args[4]) 

    pept = opencsv(args[1]) 
    l = len(pept) 
    howman = num_worker_threads 
    ll = ceil(float(l)/float(howman * 100)) 
    remain = pept 
    pepties = [] 
    while len(remain) > 0: 
     pepties.append(remain[0:ll]) 
     remain = remain[ll:] 
    for i in pepties: 
     print len(i) 
    print l 

    print "Csv file loaded..." 
    genes = openfasta(args[2]) 
    out = args[3] 
    print "Fasta file loaded..." 

    threads = [] 

    with open(out, "w") as outfile: 
     for pepts in pepties: 
      q_in.put(pepts) 

     for i in range(num_worker_threads): 
      t = threading.Thread(target=worker, args=(outfile, genes,)) 
      # t.daemon = True 
      t.start() 
      threads.append(t) 

     q_in.join() # run workers 

     # stop workers 
     for _ in range(num_worker_threads): 
      q_in.put(None) 
     for t in threads: 
      t.join() 
      # print(t) 

    return 0 
if __name__ == "__main__": 
    sys.exit(main(sys.argv)) 

下に見ることができます。

+3

複数のプロセスは、スレッドよりもマルチCPU用のアプリケーションを作成する優れたアプローチです。しかし、どのように実装しましたか?あなたのコードを投稿してください! – qvpham

+0

Pythonでの計算境界処理は、グローバルインタプリタロック(別名GIL)のためにマルチスレッドの恩恵を受けず、CPUの1つのコアのみを使用します。複数のコアを使用する場合は、マルチプロセッシングを実装する必要があります。どちらの方法も一定のオーバーヘッドを伴い、実際には処理速度を落として、処理に値するものではありません。 – martineau

+0

多分代わりにマルチプロセッシングを使用すると、これは助けになるでしょうか? – user132290

答えて

0

これは、CPythonのGIL(グローバルインタープリタロック)のためです。

グローバルインタプリタロック(GIL)は、複数のネイティブスレッドがPythonバイトコードを一度に実行できないようにするミューテックスです。

PyCon 2010のDavid Beazleyのpresentationでは、GILの詳細について説明しました。そして、32ページから34ページまで、彼は、単一コアで稼働する場合と比べて複数のコアで実行した場合、同じマルチスレッドコード(CPUバウンド計算の)がパフォーマンスを低下させる理由を説明しました。

(シングルコアとは)別の実行をスレッドが、GIL

上(別のコア上で)あなたは、複数のコアを持つ

を想像よりもはるかに 少ない頻度で、実行可能スレッドが同時にスケジュール設定を取得切り替えるとの戦い

Davidのthis experiment resultは、「CPUの数が増えるにつれてスレッド切り替えがより速くなる方法」を視覚化します。

job関数には、3レベルのネストされたループ(jobに2つ、matchに2つ)があるため、I/OにはいくつかのI/Oが含まれていますが、CPUバインド計算によく似ています。

コードを複数の処理に変更すると、複数のコアを活用してパフォーマンスを向上させることができます。 しかし、計算量を考慮すると、計算量の並列化によるメリットがプロセス間通信などの複数の処理で発生するオーバーヘッドよりもはるかに大きくなるかどうかによって異なります。

関連する問題