IMPLするための非常に簡単な方法「スティッキーセッション」は自分のバージョンをmultiprocessing.Pool
にすることです。これは熱心に作業項目を割り当てますが、確定的に割り当てます。
import multiprocessing
import os
import time
def work(job):
time.sleep(1)
print "I am process", os.getpid(), "processing job", job
class StickyPool:
def __init__(self, processes):
self._inqueues = [multiprocessing.Queue() for ii in range(processes)]
self._pool = [multiprocessing.Process(target=self._run, args=(self._inqueues[ii],)) for ii in range(processes)]
for process in self._pool:
process.start()
def map(self, fn, args):
for arg in args:
ii = hash(arg) % len(self._inqueues)
self._inqueues[ii].put((fn, arg))
def _run(self, queue):
while True:
fn, arg = queue.get()
fn(arg)
pool = StickyPool(3)
#pool = multiprocessing.Pool(3)
pool.map(work, [1,2,3,4,1,2,3,4,1,2,3,4])
time.sleep(4)
上記StickyPool
を使用して、ジョブがその引数のハッシュに基づいて割り当てられています。ここでは、不完全が、実行可能なソリューションです。つまり、毎回同じ議論が同じプロセスに行きます。ハッシュが衝突するユニークな値がたくさんある場合は、ジョブを均等に分散するほどスマートではありませんが、今後の改善の余地があります。私もシャットダウンロジックを気にしなかったので、StickyPool
を使用するとプログラムは実行を停止しませんが、multiprocessing.Pool
を使用するとプログラムは停止しません。これらの問題を修正し、インターフェイス(詳細はapply()
など)を実装し、map()
の結果を返します。
あなたが探しているものを表すのによく使われる用語は、「スティッキセッション」です。 –
@JohnZwinckご返信ありがとうございます。私が確認できる参考資料はありますか?ありがとうございました。 – galaxyan