2017-02-21 29 views
0

生データを小さなファイルに分割する前処理タスクluigiがあります。これらのファイルは、実際のパイプラインで処理されます。実行時にluigiの依存関係が変更される

パラメータに関しては、パラメータとして1つの前処理されたファイルIDを持つ各パイプラインが必要です。ただし、このファイルIDは前処理ステップでのみ生成されるため、実行時にのみ認識されます。 、Experiment

  1. 最初は、何らかの形で第二の文書

  2. への生データの分割を要求すべきである

    import luigi 
    import subprocess 
    import random 
    
    
    class GenPipelineFiles(luigi.Task): 
    
        input_file = luigi.Parameter() 
    
        def requires(self): 
         pass 
    
        def output(self): 
    
         for i in range(random.randint(0,10)): 
          yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i)) 
    
        def run(self): 
    
         for iout in self.output: 
          command = "touch {}".format(iout.fname) 
          subprocess.call(command, shell=True) 
    
    
    class RunPipelineOnSmallChunk(luigi.Task): 
        pass 
    
    
    class Experiment(luigi.WrapperTask): 
    
        input_file = luigi.Parameter(default="ex") 
    
        def requires(self): 
    
         file_ids = GenPipelineFiles(input_file=self.input_file) 
    
         for file_id in file_ids: 
          yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id) 
    
    
    luigi.run() 
    

    wrapperタスク:私の考えを説明するために、私はこの-動作しないコードを提供します取得された前処理のファイルIDを持つ実際のパイプラインが必要です。

GenPipelineFilesで出力ファイルの乱数が、これはExperimentrequiresにハードコーディングすることができないことを示しています。

おそらく、これに関連する質問は、luigiタスクには、入力ターゲットと出力ターゲットがそれぞれ1つしかないということです。おそらくGenPipelineFilesで複数の出力をモデル化する方法に関する注釈も問題を解決する可能性があります。

+0

あなたは、この時点で取得しているエラーを説明できますか? –

+0

luigi依存グラフは、 'requires'関数の戻り値に基づいて作成されます。ここでは、GePipelineFilesは決して返されないため、スケジュールされません。このコードは私の実際のコードではなく、決してエラーなしで実行されることはありません。それは私が直面している依存関係の問題を説明するためだけのものです –

答えて

0

複数の出力を処理する簡単なアプローチの1つは、入力ファイルの後に名前の付いたディレクトリを作成し、分割ファイルの出力ファイルを入力ファイルの名前を付けたディレクトリに置きます。そうすれば、従属タスクは単にディレクトリの存在を確認することができます。私は入力ファイル123.txtを持っているとしましょう。次に、ファイル123を出力する123_splitをファイル1.txt、2.txt、3.txtの出力としてGenPipelineFilesとし、ディレクトリ123_processed with 1.txt、2.txt、 RunPipelineOnSmallChunkの出力として3.txt。

requiresメソッドがExperimentの場合は、実行するタスクをリストなどに戻す必要があります。 file_ids = GenPipelineFiles(input_file=self.input_file)と書いたやり方では、メソッドによって返されていないので、そのオブジェクトのrunメソッドが呼び出されていないと思います。

ここでは、ファイルごとにターゲットで動作するサンプルコードをいくつか示します(ファイルごとのタスクではありません)。私はまだあなたが完了したことを示す何らかの種類のディレクトリまたはセンチネルファイルの単一の出力ターゲットを持つ方が安全だと思います。タスクによって各ターゲットが確実に作成されない限り、アトミシティは失われます。

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler 

sampletask.py

import luigi 
import os 
import subprocess 
import random 


class GenPipelineFiles(luigi.Task): 

    inputfile = luigi.Parameter() 
    num_targets = random.randint(0,10) 

    def requires(self): 
     pass 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "split_{}".format(self.get_prefix()) 

    def output(self): 
     targets = [] 
     for i in range(self.num_targets): 
      targets.append(luigi.LocalTarget(" {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 


class RunPipelineOnSmallChunk(luigi.Task): 

    inputfile = luigi.Parameter(default="test") 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "processed_{}".format(self.get_prefix()) 

    @staticmethod 
    def clean_input_path(path): 
     return path.replace("split", "processed") 

    def requires(self): 
     return GenPipelineFiles(self.inputfile) 

    def output(self): 
     targets = [] 
     for target in self.input(): 
      targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 
+0

あなたの答えをありがとう、あなたの提案には最適でないと思う2つのことがありますが、私はあなたの答えを誤解しているかもしれません:1. RunPipelineOnSmallChunk' 'RunPipelineOnSmallChunk'タスクはアトミックではなく、代わりにすべてのファイルをループして必要な計算を実行する必要があります。 2。'Experiment'で2つのタイプのタスクが必要な場合、それぞれが必要な依存関係を持つため、luigiスケジューラーは不必要にサブタスクを複数回実行しようとします。 –

+0

2の場合、Experimentに 'RunPipelineOnSmallChunk'と' RunPipelineOnSmallChunk' require 'GenPipelineFiles'が必要になります。 – MattMcKnight

+0

あなたのコードが修正されたバージョン – MattMcKnight

関連する問題