スレッドを使ってPythonでプログラムを実行して、タスクを並列化しています。タスクは単純な文字列のマッチングです。長い文字列のデータベースに多数の短い文字列を対応させています。私は並列化しようとしたときに、短い文字列のリストをコアの数に等しい数のサブリストに分割し、別々のコアで別々に実行することに決めました。しかし、5または10コアでタスクを実行すると、1つのコアに比べて約2倍遅くなります。その理由は何でしょうか、それをどうすれば修正できるでしょうか?複数のコアでプログラムを実行する
編集:私のコードは、コードの重要な部分はpeptsで短い配列が遺伝子で長いシーケンスにマッチしますジョブ機能、範囲内にある
import sys
import os
import csv
import re
import threading
from Queue import Queue
from time import sleep
from threading import Lock
q_in = Queue()
q_out = Queue()
lock = Lock()
def ceil(nu):
if int(nu) == nu:
return int(nu)
else:
return int(nu) + 1
def opencsv(csvv):
with open(csvv) as csvfile:
peptides = []
reader = csv.DictReader(csvfile)
k = 0
lon = ""
for row in reader:
pept = str(row["Peptide"])
pept = re.sub("\((\+\d+\.\d+)\)", "", pept)
peptides.append(pept)
return peptides
def openfasta(fast):
with open(fast, "r") as fastafile:
dic = {}
for line in fastafile:
l = line.strip()
if l[0] == ">":
cur = l
dic[l] = ""
else:
dic[cur] = dic[cur] + l
return dic
def match(text, pattern):
text = list(text.upper())
pattern = list(pattern.upper())
ans = []
cur = 0
mis = 0
i = 0
while True:
if i == len(text):
break
if text[i] != pattern[cur]:
mis += 1
if mis > 1:
mis = 0
cur = 0
continue
cur = cur + 1
i = i + 1
if cur == len(pattern):
ans.append(i - len(pattern))
cur = 0
mis = 0
continue
return ans
def job(pepts, outfile, genes):
c = 0
it = 0
towrite = []
for i in pepts:
# if it % 1000 == 0:
# with lock:
# print float(it)/float(len(pepts))
it = it + 1
found = 0
for j in genes:
m = match(genes[j], i)
if len(m) > 0:
found = 1
remb = m[0]
wh = j
c = c + len(m)
if c > 1:
found = 0
c = 0
break
if found == 1:
towrite.append("\t".join([i, str(remb), str(wh)]) + "\n")
return towrite
def worker(outfile, genes):
s = q_in.qsize()
while True:
item = q_in.get()
print "\r{0:.2f}%".format(1 - float(q_in.qsize())/float(s))
if item is None:
break #kill thread
pepts = item
q_out.put(job(pepts, outfile, genes))
q_in.task_done()
def main(args):
num_worker_threads = int(args[4])
pept = opencsv(args[1])
l = len(pept)
howman = num_worker_threads
ll = ceil(float(l)/float(howman * 100))
remain = pept
pepties = []
while len(remain) > 0:
pepties.append(remain[0:ll])
remain = remain[ll:]
for i in pepties:
print len(i)
print l
print "Csv file loaded..."
genes = openfasta(args[2])
out = args[3]
print "Fasta file loaded..."
threads = []
with open(out, "w") as outfile:
for pepts in pepties:
q_in.put(pepts)
for i in range(num_worker_threads):
t = threading.Thread(target=worker, args=(outfile, genes,))
# t.daemon = True
t.start()
threads.append(t)
q_in.join() # run workers
# stop workers
for _ in range(num_worker_threads):
q_in.put(None)
for t in threads:
t.join()
# print(t)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))
下に見ることができます。
複数のプロセスは、スレッドよりもマルチCPU用のアプリケーションを作成する優れたアプローチです。しかし、どのように実装しましたか?あなたのコードを投稿してください! – qvpham
Pythonでの計算境界処理は、グローバルインタプリタロック(別名GIL)のためにマルチスレッドの恩恵を受けず、CPUの1つのコアのみを使用します。複数のコアを使用する場合は、マルチプロセッシングを実装する必要があります。どちらの方法も一定のオーバーヘッドを伴い、実際には処理速度を落として、処理に値するものではありません。 – martineau
多分代わりにマルチプロセッシングを使用すると、これは助けになるでしょうか? – user132290