同時にいくつかのLuigiワークフローを実行すると、ワーカーの数が合計されます。これは、2つのワークフローを一緒に実行し、luigi.cfgファイル内で作業者の数がnに設定され、ワークフローでn人以上の作業者が同時に使用される場合、中央スケジューラは、 2xn労働者。複数のスクリプトが同時に実行されている場合のLuigiワーカーの数を制限する
Luigiのマニュアルでは、同時に12のワークフローを実行しても、労働者数をnに制限する方法が見つかりませんでした。
この
は、これは私が使っているサンプルスクリプトがある[core]
workers: 3
私luigi.cfgファイルである(それはsciluigi(ルイージの上の層の実際の使用になります)が、私はそれを作るとは思いませんタスクとスケジューラの構成に関する違い)。 3回の最後のワークフローでは、開始前に3つの最初のワークフローが完了するのを待っています。
import optparse
import luigi
import sciluigi
import random
import time
import sys
import os
import subprocess
class MyFooWriter(sciluigi.Task):
# We have no inputs here
# Define outputs:
outdir = sciluigi.Parameter();
def out_foo(self):
return sciluigi.TargetInfo(self, os.path.join(self.outdir,'foo.txt'))
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')
class MyFooReplacer(sciluigi.Task):
replacement = sciluigi.Parameter() # Here, we take as a parameter
# what to replace foo with.
outFile = sciluigi.Parameter();
outdir = sciluigi.Parameter();
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
return sciluigi.TargetInfo(self, os.path.join(self.outdir, self.outFile))
def run(self):
replacement = ""
with open(self.in_foo().path, 'r') as content_file:
content = content_file.read()
replacement = content.replace('foo', self.replacement)
for i in range(1,30):
sys.stderr.write(str(i)+"\n")
time.sleep(1)
with open(self.out_replaced().path,'w') as out_f:
out_f.write(replacement)
class MyWorkflow(sciluigi.WorkflowTask):
outdir = luigi.Parameter()
def workflow(self):
#rdint = random.randint(1,1000)
rdint = 100
barfile = "foobar_" + str(rdint) +'.bar.txt'
foowriter = self.new_task('foowriter', MyFooWriter, outdir = self.outdir)
fooreplacer = self.new_task('fooreplacer', MyFooReplacer, replacement='bar', outFile = barfile, outdir = self.outdir)
fooreplacer.in_foo = foowriter.out_foo
return fooreplacer
# End of script ....
if __name__ == '__main__':
parser = optparse.OptionParser()
parser.add_option('-d', dest = "outdir", action="store", default=".")
options, remainder = parser.parse_args()
params = {"outdir" : options.outdir}
wf = [MyWorkflow(outdir = options.outdir)]
luigi.build(wf)
これは私が(Perlの私の好きな言語:-)で)スクリプトを同時に実行するのに使う小さなperlスクリプトです。
#! /usr/bin/perl
use strict;
for (my $i = 0; $i < 6; $i++) {
my $testdir = "test".$i;
system("mkdir -p $testdir");
system("python run_sciluigi.py -d $testdir&");
sleep (2)
}
こんにちはマット、あなたの答えに感謝:あなたの仕事のすべてで
luigi.cfgで。これはまさに私がやることです。この質問は、主に私の労働者の概念の誤解から来ています。私は多かれ少なかれ、パイプラインが一緒に走ることができる作業の数が労働者であると理解していますが、リソースはその数に関係なく労働者が使用できる力です。 –