2016-05-22 21 views
1

これで、マルチスレッドの深さ優先検索をツリーのような構造にしたいと思っています。私はこのためにクラスタ内の複数のコンピュータからスレッドを使用しています(この例ではlocalhost quad-coreとraspberry pi 2)。マスタースレッドはプロセスを開始し、ツリー内の最初の分割で、分割する各ノードごとに新しいスレッドを生成する必要があります。これらのスレッドは、結果をマスターに報告することができます。MPI Pythonで動的なスポーンを使った深さ優先検索

mpiexecにスレッドをいくつか提供するのではなく、動的にこれを実行しようとしています。ツリーがあらかじめどのように見えるのかわからないからです(例えば2または9の分割があるかもしれません)。

私はこの質問のために取り組んでいるプロジェクトからサンプルを作成しました。次のように作業しています。それは、数字の文字列から1桁を取り、各数字のためにスレッドを生成し、そのスレッドにその数字を送信します。マスターのために

#!/usr/bin/python 
from mpi4py import MPI 
import datetime, sys, numpy, time 

################ Set up MPI variables ################ 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
name = MPI.Get_processor_name() 
status = MPI.Status() 

################ Master code ################ 

script = 'cpi.py' 
for d in '34': 
    try: 
     print 'Trying to spawn child process...' 
     icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
     spawnrank = icomm.Get_rank() 
     icomm.send(d, dest=spawnrank, tag=11) 
     print 'Spawned rank %d.' % spawnrank  
    except: ValueError('Spawn failed to start.') 

solved = False 
while solved == False: 
    #while not comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG): 
    # print 'spawns doing some work...' 
    # time.sleep(1) 
solved = comm.recv(source=MPI.ANY_SOURCE, tag=22) 
print 'received solution: %d' % solved 

それは正しく労働者を生成します、彼らは数字を受けるが、マスタに戻ってそれを送信しません。これは、マスタコードの最後の行に到達したことがない

労働者

#!/usr/bin/python 
from mpi4py import MPI 
import datetime, sys, numpy 

################ Set up MPI variables ################ 

icomm = MPI.Comm.Get_parent() 
comm = MPI.COMM_WORLD 
irank = comm.Get_rank() 
rank = comm.Get_rank() 

running = True 
while running: 
    data = None 
    data = icomm.recv(source=0, tag=11) 
    if data: 
     print 'Trying to send %s from worker rank %d to %d' % (data, rank, irank) 
     icomm.send(data, dest=0, tag=22) 
     break 
print 'Worker on rank %d done.' % rank 
icomm.Disconnect() 

:労働者のためのコードは次のようです。また、マスターコードにプローブを追加して(コメントアウトして)、タグ22のメッセージがどこかでぶら下がっているかどうかを確認し、recv関数でエラーを除外していますが、プローブがメッセージを見つけることはありません。だから私はそれが送信されないと仮定します。

両方のプロセスのランクを印刷して計算したところ、これらは両方ともランク0を使用しています。これは同じコンピュータ上で生成されるため意味があります。私はホストファイルとrankfileを追加する場合しかし、その後、奴隷のために別のコンピュータを使用するように強制しようとしている、それは私に次のエラーを与える:

[hch-K55A:06917] *** Process received signal *** 
[hch-K55A:06917] Signal: Segmentation fault (11) 
[hch-K55A:06917] Signal code: Address not mapped (1) 
[hch-K55A:06917] Failing at address: 0x3c 
[hch-K55A:06917] [ 0] /lib/x86_64-linux-gnu/libpthread.so.0(+0x10340) [0x7f2c0d864340] 
[hch-K55A:06917] [ 1] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(orte_rmaps_rank_file_lex+0x4a0) [0x7f2c0abdcb70] 
[hch-K55A:06917] [ 2] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(+0x23ac) [0x7f2c0abda3ac] 
[hch-K55A:06917] [ 3] /usr/lib/libopen-rte.so.4(orte_rmaps_base_map_job+0x2e) [0x7f2c0dacd05e] 
[hch-K55A:06917] [ 4] /usr/lib/libopen-rte.so.4(orte_plm_base_setup_job+0x5a) [0x7f2c0dac580a] 
[hch-K55A:06917] [ 5] /usr/lib/openmpi/lib/openmpi/mca_plm_rsh.so(orte_plm_rsh_launch+0x338) [0x7f2c0b80a8c8] 
[hch-K55A:06917] [ 6] /usr/lib/libopen-rte.so.4(+0x51ff4) [0x7f2c0dac3ff4] 
[hch-K55A:06917] [ 7] /usr/lib/libopen-rte.so.4(opal_event_base_loop+0x31e) [0x7f2c0dae9cfe] 
[hch-K55A:06917] [ 8] mpiexec() [0x4047d3] 
[hch-K55A:06917] [ 9] mpiexec() [0x40347d] 
[hch-K55A:06917] [10] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5) [0x7f2c0d4b0ec5] 
[hch-K55A:06917] [11] mpiexec() [0x403399] 
[hch-K55A:06917] *** End of error message *** 
Segmentation fault (core dumped) 

使用するコマンド:mpiexecの-np 1 --hostfileホストファイル - -rankfile rankfileパイソンspawntest.py

HOSTFILE:ローカルホスト ローカルホストスロット= 1 MAX-スロット= 4 PI2する@ raspi2スロット= 4

Rankfile: ランク0 = localhostのスロット= 1 ランク1 = pi2 @ raspi2 slot = 1-4

私の質問は次のとおりです。データを前後に送信できる間に、マスター以外のコンピューターにこれらのスレッドをスポーンするにはどうすればよいですか?

答えて

3

あなたのマスターのコードは非常に間違っており、あなたは何が起こっているのかについての概念的な理解が欠けていると感じています。

MPI_COMM_SPAWN(またはそのmpi4pyの対応部分comm.Spawn())によって生成されたジョブのMPIプロセスは、親のMPI_COMM_WORLDの一部にはなりません。スポーンされたプロセスは完全に別の世界コミュニケータを形成し、インターコミュニケータを介して親ジョブと連動します。これはまさにスポーンが返すものです。あなたのケースでは、icomm = MPI.COMM_SELF.Spawn(...)はマスタープロセスのインターコミュニケーターハンドルです。子ジョブ内のプロセスは、MPI_COMM_GET_PARENTMPI.Comm.Get_parent() in mpi4py)を使用してインターコミュニケータハンドルを取得します。あなたは、単一のプロセスジョブを産卵しているので:

MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
                ^^^^^^^^^^ 

が新たに形成された世界の子ジョブのコミュニケーので、MPI.COMM_WORLD.Get_rank()リターンで唯一のプロセスは、各ワーカーにゼロとなります。

ご主人のコードのこの部分は間違っているが、それはまだ原因intercommunicatorsが実際にどのように動作するかに機能:

spawnrank = icomm.Get_rank() # <--- not what you expect 
icomm.send(d, dest=spawnrank, tag=11) 

Intercommunicatorsは、プロセスの2つのグループをリンクします。そのうちの一つがローカルグループと呼ばれ、他の一つはリモートグループと呼ばれています。 intercommunicatorにMPI_COMM_RANKcomm.Get_rank())を使用するときは、ローカルグループで呼び出し元プロセスのランクを取得します。送信または受信の場合、指定されたランクはリモートグループに関連しています。あなたのケースでは、以下のintercommunicatorに新しいワーカー結果を産卵:

mastet's MPI_COMM_SELF   child's MPI_COMM_WORLD 
       |        | 
+=============|================================|=============+ 
| +----------V----------+  +-------------V----------+ | 
| | group of the master |  | group of the child job | | 
| |  [ 0 ]  |  |   [ 0 ]   | | 
| +---------------------+  +------------------------+ | 
|     intercommunicator      | 
+============================================================+ 

;グループがローカルである(各グループが来るショー上記のコミュニケータは、コミュニケーター自体がintercommunicatorの一部ではない)

呼び出し元のプロセスがどのグループに属しているかによって異なります。マスタプロセスのローカルグループは、子ジョブのランクのリモートグループであり、その逆もあります。ここで重要なことは、グループに少なくとも1つのプロセスが存在するため、各グループにはランク0があることです。あなたは、マスター・グループは、その中に単一のプロセスを持っているとことだけラッキーですのでicomm.Get_rank()戻り0(マスタのローカルグループは、常に単一のプロセスが含まれているMPI_COMM_SELF、由来しているので、それは常にゼロを返します)、にたまたま(常に)リモート(子)グループ内の有効なランクです。

icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
    icomm.send(d, dest=0, tag=11) 

(このコードは、明示的に前にしながら、リモートグループの0をランク付けするために送信されます行うには正しい事は、たとえばランク0、リモートグループ内に存在することがわかっている固定ランクにメッセージを送信することです間違ってはいるが - - 作品、まだ値0はちょうど幸運偶然)送信部、言っ

だったこと。受信部はありませんし、いくつかの理由があります。まず、間違ったコミュニケータを使用しています - 子プロセスがそのメンバーではないため、MPI_COMM_WORLDからの受信は機能しません。実際、MPIのコミュニケータは不変です。新しいコミュニケータを作成せずにランクを追加または削除することはできません。労働者から受け取るには、それを使用しているのと同じ方法で、icommを使用する必要があります。今度は2番目の問題があります。icommがマスターの新しいSpawnによって上書きされるため、子ジョブ以外の子ジョブと通信する能力は失われます。ハンドルのリストを保持し、そのハンドルをハンドルに追加する必要があります。

ザが一部を受け取るは、もう少し複雑です。 MPI_ANY_COMMはありません。すべての子ジョブがすべて別のインターコミュニケーターに居住しているので、受信操作を行うことはできません。あなたはintercommunicatorsのリストまたは(より良い)を超えるMPI_IPROBEとのいずれかのループがMPI_WAIT_SOMEを(mpi4py相当が何であれ)を使用し、その後、それぞれの子から受信して、非ブロッキング開始する必要があります。ループ付き

は、マスターコードは次のようになります(注 - 未テストコードを、私は持っている、および/または使用しないmpi4py):

#!/usr/bin/python 
from mpi4py import MPI 
import datetime, sys, numpy, time 

################ Set up MPI variables ################ 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
name = MPI.Get_processor_name() 
status = MPI.Status() 

################ Master code ################ 

icomms = [] 
script = 'cpi.py' 
for d in '34': 
    try: 
     print 'Trying to spawn child process...' 
     icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
     icomm.send(d, dest=0, tag=11) 
     icomms.append(icomm) 
     print 'Spawned a child.' 
    except: ValueError('Spawn failed to start.') 

solved = False 
while not solved and icomms: 
    for icomm in icomms: 
     if icomm.Iprobe(source=0, tag=MPI.ANY_TAG): 
      print 'A child responded...' 
      solved = icomm.recv(source=0, tag=MPI.ANY_TAG) 
      icomm.Disconnect() 
      icomms.remove(icomm) 
      if solved: break 
    if not solved: 
     print 'spawns doing some work...' 
     time.sleep(1) 
# make sure all pending sends get matched 
for icomm in icomms: 
    icomm.recv(source=0, tag=MPI.ANY_TAG) 
    icomm.Disconnect() 
print 'received solution: %d' % solved 

私はあなたのアイデアを得る願っています。

追加:スポーンされたジョブ内からジョブをスポーンすると、新しい子は最上位のマスターへの接続を簡単に確立できません。そのためには、MPI-2クライアント/サーバーモデルサポートのあいまいな部分に目を向け、マスターにMPI_PORT_OPENのポートを開いてから、MPI_PUBLISH_NAMEを使用してMPIネーミングサービスに登録し、最後にMPI_COMM_ACCEPTを使用して、他のMPIジョブ。ワーカーはMPI_LOOKUP_NAMEを使用してポートへの参照を取得し、MPI_COMM_CONNECTを使用してマスタージョブとの通信を確立する必要があります。これらの関数のラッパーがmpi4pyに存在するかどうかはわかりません。

+0

詳細な回答をいただきありがとうございます。 – Enzime

関連する問題