ここに、時間の経過とともに労働者を失うことを示すためのmcveがあります。これは、例えばはかなり最小限のではなく、それは私たちの典型的な作業パターンのアイデアを与えるん時間をかけて労働者を失うことを避ける
Distributing graphs to across cluster nodes
にフォローです。問題を引き起こすには睡眠が必要です。これは、以前の結果から大きなグラフを生成する必要があるため、完全なアプリケーションで発生します。
私はクラスタ上でこれを実行すると、私は8つのノード上の32人の労働者を得るためにDASK-sshを使用:
dask-ssh --nprocs 4 --nthreads 1 --scheduler-port 8786 --log-directory `pwd` --hostfile hostfile.$JOBID &
sleep 10
それは労働者のフルセットで約10分未満で実行する必要があります。私は診断画面の実行に従います。イベントの下では、私は労働者が追加されているのを見ますが、時には、常にスケジューラをホストしているノードに残っている労働者だけを残している労働者の数がいつもではないことがあります。クライアントのための典型的な実験をGrep'ing
は""" Test to illustrate losing workers under dask/distributed.
This mimics the overall structure and workload of our processing.
Tim Cornwell 9 Sept 2017
[email protected]
"""
import numpy
from dask import delayed
from distributed import Client
# Make some randomly located points on 2D plane
def init_sparse(n, margin=0.1):
numpy.random.seed(8753193)
return numpy.array([numpy.random.uniform(margin, 1.0 - margin, n),
numpy.random.uniform(margin, 1.0 - margin, n)]).reshape([n, 2])
# Put the points onto a grid and FFT, skip to save time
def grid_data(sparse_data, shape, skip=100):
grid = numpy.zeros(shape, dtype='complex')
loc = numpy.round(shape * sparse_data).astype('int')
for i in range(0, sparse_data.shape[0], skip):
grid[loc[i,:]] = 1.0
return numpy.fft.fft(grid).real
# Accumulate all psfs into one psf
def accumulate(psf_list):
lpsf = 0.0 * psf_list[0]
for p in psf_list:
lpsf += p
return lpsf
if __name__ == '__main__':
import sys
import time
start=time.time()
# Process nchunks each of length len_chunk 2d points, making a psf of size shape
len_chunk = int(1e6)
nchunks = 16
shape=[512, 512]
skip = 100
# We pass in the scheduler from the invoking script
if len(sys.argv) > 1:
scheduler = sys.argv[1]
client = Client(scheduler)
else:
client = Client()
print("On initialisation", client)
sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)]
sparse_graph = client.compute(sparse_graph, sync=True)
print("After first sparse_graph", client)
xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph]
xfr = client.compute(xfr_graph, sync=True)
print("After xfr", client)
tsleep = 120.0
print("Sleeping now for %.1f seconds" % tsleep)
time.sleep(tsleep)
print("After sleep", client)
sparse_graph = [delayed(init_sparse)(len_chunk) for i in range(nchunks)]
# sparse_graph = client.compute(sparse_graph, sync=True)
xfr_graph = [delayed(grid_data)(s, shape=shape, skip=skip) for s in sparse_graph]
psf_graph = delayed(accumulate)(xfr_graph)
psf = client.compute(psf_graph, sync=True)
print("*** Successfully reached end in %.1f seconds ***" % (time.time() - start))
print(numpy.max(psf))
print("After psf", client)
client.shutdown()
exit()
を示しています。これは動作しますが、それがなかった理由
On initialisation <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16>
After first sparse_graph <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16>
After xfr <Client: scheduler='tcp://sand-8-17:8786' processes=16 cores=16>
After sleep <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4>
After psf <Client: scheduler='tcp://sand-8-17:8786' processes=4 cores=4>
おかげで、 ティム
私たちはまだこれで苦労しています。 '[worker openhpc-compute-1]:distributed.coreを参照してください。 - 警告 - イベントループは1.12秒間に応答しませんでした。これは、長時間GILを保持する機能や大量のデータを移動することによって発生することがよくあります。これは、タイムアウトと不安定を引き起こす可能性があります。 [scheduler openhpc-compute-0:8786]:distributed.scheduler - INFO - Worker 'tcp://10.60.253.19:39025'が閉じられた通信から失敗しました:TimeoutError:[Errno 110]接続がタイムアウトしました [スケジューラopenhpc-compute-0:8786]:distributed.scheduler - INFO - ワーカーtcpを削除://10.60.253.19:39025' –