2017-11-14 23 views
1

同時にいくつかのLuigiワークフローを実行すると、ワーカーの数が合計されます。これは、2つのワークフローを一緒に実行し、luigi.cfgファイル内で作業者の数がnに設定され、ワー​​クフローでn人以上の作業者が同時に使用される場合、中央スケジューラは、 2xn労働者。複数のスクリプトが同時に実行されている場合のLuigiワーカーの数を制限する

Luigiのマニュアルでは、同時に12のワークフローを実行しても、労働者数をnに制限する方法が見つかりませんでした。

この

は、これは私が使っているサンプルスクリプトがある

[core] 
workers: 3 

私luigi.cfgファイルである(それはsciluigi(ルイージの上の層の実際の使用になります)が、私はそれを作るとは思いませんタスクとスケジューラの構成に関する違い)。 3回の最後のワークフローでは、開始前に3つの最初のワークフローが完了するのを待っています。

import optparse 
import luigi 
import sciluigi 
import random 
import time 
import sys 
import os 
import subprocess 


class MyFooWriter(sciluigi.Task): 
    # We have no inputs here 
    # Define outputs: 
    outdir = sciluigi.Parameter(); 
    def out_foo(self): 
     return sciluigi.TargetInfo(self, os.path.join(self.outdir,'foo.txt')) 
    def run(self): 
     with self.out_foo().open('w') as foofile: 
      foofile.write('foo\n') 

class MyFooReplacer(sciluigi.Task): 
    replacement = sciluigi.Parameter() # Here, we take as a parameter 
            # what to replace foo with. 
    outFile = sciluigi.Parameter(); 
    outdir = sciluigi.Parameter(); 
    # Here we have one input, a "foo file": 
    in_foo = None 

    # ... and an output, a "bar file": 
    def out_replaced(self): 
     return sciluigi.TargetInfo(self, os.path.join(self.outdir, self.outFile)) 

    def run(self): 
     replacement = "" 
     with open(self.in_foo().path, 'r') as content_file: 
      content = content_file.read() 
      replacement = content.replace('foo', self.replacement) 
      for i in range(1,30): 
       sys.stderr.write(str(i)+"\n") 
       time.sleep(1) 
     with open(self.out_replaced().path,'w') as out_f: 
      out_f.write(replacement) 



class MyWorkflow(sciluigi.WorkflowTask): 
    outdir = luigi.Parameter() 
    def workflow(self): 
     #rdint = random.randint(1,1000) 
     rdint = 100 
     barfile = "foobar_" + str(rdint) +'.bar.txt' 
     foowriter = self.new_task('foowriter', MyFooWriter, outdir = self.outdir) 
     fooreplacer = self.new_task('fooreplacer', MyFooReplacer, replacement='bar', outFile = barfile, outdir = self.outdir) 
     fooreplacer.in_foo = foowriter.out_foo 
     return fooreplacer 


# End of script .... 
if __name__ == '__main__': 
    parser = optparse.OptionParser() 
    parser.add_option('-d', dest = "outdir", action="store", default=".") 
    options, remainder = parser.parse_args() 
    params = {"outdir" : options.outdir}  
    wf = [MyWorkflow(outdir = options.outdir)] 
    luigi.build(wf) 

これは私が(Perlの私の好きな言語:-)で)スクリプトを同時に実行するのに使う小さなperlスクリプトです。

#! /usr/bin/perl 

use strict; 

for (my $i = 0; $i < 6; $i++) { 
    my $testdir = "test".$i; 
    system("mkdir -p $testdir"); 
    system("python run_sciluigi.py -d $testdir&"); 
    sleep (2) 
} 

答えて

0

ない正確に労働者の制限、それは同時実行にグローバルな制限を置くためにresources概念を使用することは可能ですが。

class MyFooReplacer(sciluigi.Task): 
    resources = {'max_workers': 1} 

http://luigi.readthedocs.io/en/stable/configuration.html#resources

+0

こんにちはマット、あなたの答えに感謝:あなたの仕事のすべてで

luigi.cfgで

[resources] max_workers=5 

。これはまさに私がやることです。この質問は、主に私の労働者の概念の誤解から来ています。私は多かれ少なかれ、パイプラインが一緒に走ることができる作業の数が労働者であると理解していますが、リソースはその数に関係なく労働者が使用できる力です。 –

関連する問題