2017-09-13 3 views
0

ここに、時間の経過とともに労働者を失うことを示すためのmcveがあります。これは、例えばはかなり最小限のではなく、それは私たちの典型的な作業パターンのアイデアを与えるん時間をかけて労働者を失うことを避ける

Distributing graphs to across cluster nodes

にフォローです。問題を引き起こすには睡眠が必要です。これは、以前の結果から大きなグラフを生成する必要があるため、完全なアプリケーションで発生します。

私はクラスタ上でこれを実行すると、私は8つのノード上の32人の労働者を得るためにDASK-sshを使用:

dask-ssh --nprocs 4 --nthreads 1 --scheduler-port 8786 --log-directory `pwd` --hostfile hostfile.$JOBID & 
sleep 10 

それは労働者のフルセットで約10分未満で実行する必要があります。私は診断画面の実行に従います。イベントの下では、私は労働者が追加されているのを見ますが、時には、常にスケジューラをホストしているノードに残っている労働者だけを残している労働者の数がいつもではないことがあります。クライアントのための典型的な実験をGrep'ing

""" Test to illustrate losing workers under dask/distributed. 

This mimics the overall structure and workload of our processing. 

Tim Cornwell 9 Sept 2017 
[email protected] 
""" 
import numpy 
from dask import delayed 
from distributed import Client 


# Make some randomly located points on 2D plane 
def init_sparse(n, margin=0.1): 
    numpy.random.seed(8753193) 
    return numpy.array([numpy.random.uniform(margin, 1.0 - margin, n), 
         numpy.random.uniform(margin, 1.0 - margin, n)]).reshape([n, 2]) 


# Put the points onto a grid and FFT, skip to save time 
def grid_data(sparse_data, shape, skip=100): 
    grid = numpy.zeros(shape, dtype='complex') 
    loc = numpy.round(shape * sparse_data).astype('int') 
    for i in range(0, sparse_data.shape[0], skip): 
     grid[loc[i,:]] = 1.0 
    return numpy.fft.fft(grid).real 

# Accumulate all psfs into one psf 
def accumulate(psf_list): 
    lpsf = 0.0 * psf_list[0] 
    for p in psf_list: 
     lpsf += p 
    return lpsf 


if __name__ == '__main__': 
    import sys 
    import time 
    start=time.time() 

    # Process nchunks each of length len_chunk 2d points, making a psf of size shape 
    len_chunk = int(1e6) 
    nchunks = 16 
    shape=[512, 512] 
    skip = 100 

    # We pass in the scheduler from the invoking script 
    if len(sys.argv) > 1: 
     scheduler = sys.argv[1] 
     client = Client(scheduler) 
    else: 
     client = Client() 

    print("On initialisation", client) 

    sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)] 
    sparse_graph = client.compute(sparse_graph, sync=True) 
    print("After first sparse_graph", client) 

    xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph] 
    xfr = client.compute(xfr_graph, sync=True) 
    print("After xfr", client) 

    tsleep = 120.0 
    print("Sleeping now for %.1f seconds" % tsleep) 
    time.sleep(tsleep) 
    print("After sleep", client) 

    sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)] 
    # sparse_graph = client.compute(sparse_graph, sync=True) 
    xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph] 
    psf_graph = delayed(accumulate)(xfr_graph) 
    psf = client.compute(psf_graph, sync=True) 

    print("*** Successfully reached end in %.1f seconds ***" % (time.time() - start)) 
    print(numpy.max(psf)) 
    print("After psf", client) 

    client.shutdown() 
    exit() 

を示しています。これは動作しますが、それがなかった理由

On initialisation <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16> 
After first sparse_graph <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16> 
After xfr <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16> 
After sleep <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4> 
After psf <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4> 

おかげで、 ティム

+0

私たちはまだこれで苦労しています。 '[worker openhpc-compute-1]:distributed.coreを参照してください。 - 警告 - イベントループは1.12秒間に応答しませんでした。これは、長時間GILを保持する機能や大量のデータを移動することによって発生することがよくあります。これは、タイムアウトと不安定を引き起こす可能性があります。 [scheduler openhpc-compute-0:8786]:distributed.scheduler - INFO - Worker 'tcp://10.60.253.19:39025'が閉じられた通信から失敗しました:TimeoutError:[Errno 110]接続がタイムアウトしました [スケジューラopenhpc-compute-0:8786]:distributed.scheduler - INFO - ワーカーtcpを削除://10.60.253.19:39025' –

答えて

0

それはかなり明確ではありません。私たちはdask-sshを使用していましたが、作業者の作成をより詳細に制御する必要がありました。最終的に我々は和解しました:

scheduler=$(head -1 hostfile.$JOBID) 
hostIndex=0 
for host in `cat hostfile.$JOBID`; do 
    echo "Working on $host ...." 
    if [ "$hostIndex" = "0" ]; then 
     echo "run dask-scheduler" 
     ssh $host dask-scheduler --port=8786 & 
     sleep 5 
    fi 
    echo "run dask-worker" 
    ssh $host dask-worker --host ${host} --nprocs NUMBER_PROCS_PER_NODE \ 
    --nthreads NUMBER_THREADS \ 
    --memory-limit 0.25 --local-directory /tmp $scheduler:8786 & 
    sleep 1 
    hostIndex="1" 
done 
echo "Scheduler and workers now running" 
関連する問題