0
ファイルを解析し、大きなリストに変換するスクリプトを作成しようとしています。このリストは後で並列処理されるはずです。私はPythonのマルチプロセッシングのいくつかの実装を試みましたが、それらはすべて順番に実行されるようです。Pythonマルチプロセッシング共有リスト
def grouper(n, iterable, padvalue=None):
"""grouper(3, 'abcdefg', 'x') -->
('a','b','c'), ('d','e','f'), ('g','x','x')"""
return izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
def createRecords(givenchunk):
for i1 in range(len(givenchunk)):
<create somedata>
records.append(somedata)
if __name__=='__main__':
manager = Manager()
parsedcdrs = manager.list([])
records = manager.list([])
<some general processing here which creates a shared list "parsedcdrs". Uses map to create a process "p" in some def which is terminated afterwards.>
# Get available cpus
cores = multiprocessing.cpu_count()
# First implementation with map with map.
t = multiprocessing.Pool(cores)
print "Map processing with chunks containing 5000"
t.map(createRecords, zip(parsedcdr), 5000)
# Second implementation with async.
t = multiprocessing.Pool(cores)
for chunk in grouper(5000, parsedcdr):
print "Async processing with chunks containing 5000"
t.apply_async(createRecords, args=(chunk,), callback=log_result)
t.close()
t.join()
# Third implementation with Process.
jobs = []
for chunk in grouper(5000, parsedcdr):
t = multiprocessing.Process(target=createRecords, args=(chunk,))
t.start()
jobs.append(t)
print "Process processing with chunks containing 5000"
for j in jobs:
j.join()
for j in jobs:
j.join()
誰かが私を正しい方向に向けることができますか?
あなたの最初の実装で見る限り、aproachはほぼOKですが、処理するリストの各要素は別のリスト(または反復可能)ですか?リスト内の要素 – Netwave
"parsedcdr" は、例えば、実際に他のリストである: '[1482232410、[ 'astp3'、u'elem1' 、u'elem2' 、u'elem3' ]]、[1482232576 、['astp3'、u'elem4 '、u'elem5'、u'elem6 ']]]' –
driesken
あなたの最初の実装は問題なく動作しているはずですが、どうしてそれらがsecuentiallyで動作していると思いますか?また、「5000」チャンクを削除して、1つ1つを選択するかどうかを確認してください。 – Netwave