2013-06-11 23 views
14

私はPythonでマルチプロセッサプログラミングを試みています。例えば、Fibonacciのような分裂と征服のアルゴリズムを取る。実行のプログラムフローはツリーのように分岐し、並列に実行されます。つまり、nested parallelismの例があります。Pythonでネストされた並列処理

Javaから、リソースを管理するためにスレッドプールパターンを使用しました。プログラムは非常に短時間で分岐し、短命のスレッドを非常に多く作成する可能性があるからです。単一の静的(共有)スレッドプールは、ExecutorServiceによってインスタンス化できます。

Poolについては同じことが期待されますが、Pool object is not to be globally sharedと表示されます。たとえば、multiprocessing.Manager.Namespace()を使用してプールを共有すると、エラーが発生します。

プールオブジェクトは、私は2部構成の質問持っているプロセス間で渡されるか

を漬けすることはできません。

  1. 私はここで何をしないのですが。プールをプロセス間で共有すべきではないのはなぜですか?
  2. Pythonでネストされた並列性を実装するためのパターンはとは何ですか?可能であれば、再帰的な構造を維持し、反復のためにそれを取引しないでください。

from concurrent.futures import ThreadPoolExecutor 

def fibonacci(n): 
    if n < 2: 
     return n 
    a = pool.submit(fibonacci, n - 1) 
    b = pool.submit(fibonacci, n - 2) 
    return a.result() + b.result() 

def main(): 
    global pool 

    N = int(10) 
    with ThreadPoolExecutor(2**N) as pool: 
     print(fibonacci(N)) 

main() 

のJava

public class FibTask implements Callable<Integer> { 

    public static ExecutorService pool = Executors.newCachedThreadPool(); 
    int arg; 

    public FibTask(int n) { 
     this.arg= n; 
    } 

    @Override 
    public Integer call() throws Exception { 
     if (this.arg > 2) { 
      Future<Integer> left = pool.submit(new FibTask(arg - 1)); 
      Future<Integer> right = pool.submit(new FibTask(arg - 2)); 
      return left.get() + right.get(); 
     } else { 
      return 1; 
     } 

    } 

    public static void main(String[] args) throws Exception { 
     Integer n = 14; 
     Callable<Integer> task = new FibTask(n); 
     Future<Integer> result =FibTask.pool.submit(task); 
     System.out.println(Integer.toString(result.get())); 
     FibTask.pool.shutdown();    
    }  

} 

それがここで問題あれば、私はわからないんだけど、私は "プロセス" と "スレッド" との違いを無視しています。私にとっては両方とも「仮想化されたプロセッサ」を意味します。私の理解は、プールの目的は "プール"またはリソースの共有のためです。タスクを実行すると、プールにリクエストを送信できます。並列タスクが他のスレッドで完了すると、それらのスレッドを再利用して新しいタスクに割り当てることができます。プールの共有を拒否することは意味がありません。そのため、スレッドプールの目的を破るように見えるので、各スレッドは独自の新しいプールをインスタンス化する必要があります。

+0

なぜグローバルに共有する必要がありますか?1つの名前空間/クラス内にすべてを含めることはできませんか? –

+2

@InbarRose異なるプロセス内で再帰呼び出しを実行する再帰関数では、プールがforkされ、サブプロセスによって呼び出されるという問題があります。このため、キューに問題が発生し、動作しません。とにかく、Javaでは* threads *を使用していることを強調したいと思います。スレッドでは、プールオブジェクトの分岐がないため、問題はありません。私は、Javaでプロセスプールを使用することは、多かれ少なかれ、同じ動作につながると考えています。 – Bakuriu

+0

@InbarRoseクラスインスタンスと静的変数として 'Pool'を含めてみましたが、それでも問題は同じになりました。例えば、 'Pool'と再帰呼び出しは単一のクラスに含まれていますが、それでもやはり同じ問題が発生します。>プールオブジェクトはプロセス間を渡すことができません。 –

答えて

3

1)ここでは何が分かりませんか。プールをプロセス間で共有すべきではないのはなぜですか?

>>> import threading, pickle 
>>> pickle.dumps(threading.Lock()) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
[...] 
    File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
    raise TypeError, "can't pickle %s objects" % base.__name__ 
TypeError: can't pickle lock objects 

以上:

ないすべてのオブジェクト/インスタンスは、この場合には、プールがpickableないthreading.lockを使用して、直列化可能/ pickableある

>>> import threading, pickle 
>>> from concurrent.futures import ThreadPoolExecutor 
>>> pickle.dumps(ThreadPoolExecutor(1)) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File 
[...] 
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save 
     rv = reduce(self.proto) 
     File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
     raise TypeError, "can't pickle %s objects" % base.__name__ 
    TypeError: can't pickle lock objects 

あなたが考える場合それは意味があります。ロックはオペレーティングシステムによって管理されるセマフォープリミティブです(Pythonはネイティブスレッドを使用するため)。 pythonランタイムの中でそのオブジェクト状態をpickleして保存することができれば、本当の状態がOSによって保持されているので意味のある何かを達成することはできません。

2)Pythonでネストされた並列性を実装するためのパターンは何ですか?あなたがプロセスのスレッド(ThreadPoolExecutor)を使用していないので、可能な場合には、再帰的な構造を維持し、今反復

のためにそれを取引しない、威信のために、私は上記のすべてが本当に(あなたの例には適用されません。 ProcessPoolExecutor)ので、プロセス間のデータ共有は行われません。

あなたのJavaの例は、使用しているスレッドプール(CachedThreadPool)が必要に応じて新しいスレッドを作成しているのに対し、python実行プログラムの実装が制限されているため、明示的な最大スレッド数(max_workers)基本的には両方の例では、実行するスレッドの数がまったく同じになります(Pythonの静的インスタンスは基本的に明示的にスコープされていないものです)。例えば、ここでのpythonではかなりナイーブCachedThreadPoolExecutorの実装を使用した例です:

from concurrent.futures import ThreadPoolExecutor 

class CachedThreadPoolExecutor(ThreadPoolExecutor): 
    def __init__(self): 
     super(CachedThreadPoolExecutor, self).__init__(max_workers=1) 

    def submit(self, fn, *args, **extra): 
     if self._work_queue.qsize() > 0: 
      print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1)) 
      self._max_workers +=1 

     return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra) 

pool = CachedThreadPoolExecutor() 

def fibonacci(n): 
    print n 
    if n < 2: 
     return n 
    a = pool.submit(fibonacci, n - 1) 
    b = pool.submit(fibonacci, n - 2) 
    return a.result() + b.result() 

print(fibonacci(10)) 

パフォーマンスチューニング:

それはスレッドのオーバーヘッドなしで、あなたに高い並列性を与えるので、私は強くgeventに探してお勧めします。これは必ずしもそうではありませんが、あなたのコードは実際にはgeventの使用のためのポスターの子です。完全に非科学的

import gevent 

def fibonacci(n): 
    print n 
    if n < 2: 
     return n 
    a = gevent.spawn(fibonacci, n - 1) 
    b = gevent.spawn(fibonacci, n - 2) 
    return a.get() + b.get() 

print(fibonacci(10)) 

が、私のコンピュータ上で上記のコードは、そのねじ切り同等より速く9xのを実行します:ここでは例です。

こちらがお役に立てば幸いです。

+1

geventはあなたに並列性を与えません。 –

+0

右は、計算上の並列性はありませんが、当初の問題は選択された線形アルゴリズムを変更するのではなく、改良するために共通のパターンを提案することでした。 –

+0

アルゴリズムの変更は必要ありません。この例では、すでに独立したサブタスクに分割されています。必要なのは、実際に並行してタスクを実行する基板(つまり、geventのような並行処理ソリューションではない)です。 –

0

1.ここでは何が欠けていますか。プールをプロセス間で共有すべきではないのはなぜですか?

一般に、言語に関係なく、プロセス間でOSスレッドを共有することはできません。

プールマネージャへのアクセスをワーカープロセスと共有することができますが、それはおそらく問題の良い解決策ではありません。下記参照。

2. Pythonでネストされた並列性を実装するためのパターンは何ですか?可能であれば、再帰的な構造を維持し、反復のためにそれを取引しない。

これはデータに大きく依存します。

CPythonでは、一般的な答えは、効率的な並列操作を実装するデータ構造を使用することです。これの良い例は、NumPyの最適化された配列の種類です。hereは、それらを使用して大規模な配列操作を複数のプロセッサコアに分割する例です。

ブロック反復を使用して実装されたフィボナッチ関数は、任意のワーカープールベースのアプローチに特に適していますが、fib(N)は他のワーカーを待っているだけで何もしません。フィボナッチ関数に具体的にアプローチするには他にも多くの方法があります(例:CPSを使用してブロッキングを除去し、一定数の作業者を埋めるなど)が、例ではなく実際の問題に基づいて戦略を決定する方が良いでしょうこのような。

関連する問題