2016-10-12 12 views
0

多くの私のプロジェクトでは、パイプラインツールとしてluigiを使用しています。これは、私がそれを使ってパラメータ検索を実装することを考えさせました。luigiの多くのパラメータを扱う

def output(self): 
    return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval) 

つまり、パラメータはファイル名で保存されます:標準luigi.file.LocalTargetもドキュメントの例で示されているパラメータに対処するための非常に単純なアプローチを持っています。これにより、特定のパラメータの組み合わせがすでに計算されているかどうかを簡単に確認できます。これは、タスクのパラメータがより複雑になるとすぐに乱雑になります。ここで

は、パラメータ検索の非常にシンプルなアイデアです:

この例では、4つのすべてのパラメータがファイル名にエンコードすることができますが、それはファンタシーの多くを必要としない、その
import luigi 
class Sum(luigi.Task): 

    long_ = luigi.Parameter() 
    list_ = luigi.Parameter() 
    of = luigi.Parameter() 
    parameters = luigi.Parameter() 

    def output(self): 
     return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_, 
                   self.list_, 
                   self.of, 
                   self.parameters)) 

    def run(self): 

     sum_ = self.long_ + self.list_ + self.of + self.parameters 
     with self.output().open('w') as out_file: 
      out_file.write(str(sum_)) 


class ParameterSearch(luigi.Task): 

    def requires(self): 

     list_of_parameter_combinations = [ 
      { 
       "long_" : 1, 
       "list_" : 2, 
       "of" : 3, 
       "parameters" : 4 

      },{ 
       "long_" : 5, 
       "list_" : 6, 
       "of" : 7, 
       "parameters" : 8 
      } 
     ] 

     for pc in list_of_parameter_combinations: 
      yield Sum(**pc) 

確かに、このアプローチは境界に到達することができます。たとえば、配列のようなパラメータを考えてみましょう。

私は、パラメータと結果をある種の封筒オブジェクトに保存し、それをターゲットとして保存できるようにしました。 ファイル名は、最初のファジー検索のパラメータの何らかの種類のハッシュになります。

封筒のクラスがあり

class Envelope(object): 

    @classmethod 
    def hashify(cls, params): 
     return hash(frozenset(params.items())) 

    def __init__(self, result, **params): 

     self.params = {} 
     for k in params: 
      self.params[k] = params.get(k) 

    def hash(self): 
     return Envelope.hashify(self.params) 

その後、LocalTargetを高め、エンベロープ内の全てのパラメータが一致しているかどうかを確認することができ、新たな目標があり、:

class EnvelopedTarget(luigi.file.LocalTarget): 

    fs = luigi.file.LocalFileSystem() 

    def __init__(self, params, path=None, format=None, is_tmp=False): 
     self.path = path 
     self.params = params 

     if format is None: 
      format = luigi.file.get_default_format() 

     if not path: 
      if not is_tmp: 
       raise Exception('path or is_tmp must be set') 
      path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999)) 
     super(EnvelopedTarget, self).__init__(path) 
     self.format = format 
     self.is_tmp = is_tmp 

    def exists(self): 
     path = self.path 
     if '*' in path or '?' in path or '[' in path or '{' in path: 
      logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; " 
          "override exists() to suppress the warning.", path) 
     if self.fs.exists(path): 
      with self.open() as fin: 
       envelope = pickle.load(fin) 

       try: 
        assert len(envelope.params) == len(self.params) 
        for param,paramval in self.params.items(): 
         assert paramval == envelope.params.get(param) 

       except(AssertionError): 
        return False 
      return True 
     else: 
      return False 

ここでの問題は、このターゲットを使用すると、もともとルイジが最小限に抑えることを目指す定型文が追加されることです。このタスクは、初めにSumタスクと同じ方法で実行することができます

class EnvelopedSum(BaseTask): 

    long_ = luigi.Parameter() 
    list_ = luigi.Parameter() 
    of = luigi.Parameter() 
    parameters = luigi.Parameter() 

    def my_run(self): 
     return sum(self.param_kwargs.values()), self.param_kwargs 

:私は新しい基本タスク

class BaseTask(luigi.Task): 

    def output(self, envelope): 
     path = '{}{}.txt'.format(type(self).__name__, envelope.hash()) 
     params = envelope.params 
     return EnvelopedTarget(params, path=path) 

    def complete(self): 

     envelope = Envelope(None, **self.param_kwargs) 

     outputs = flatten(self.output(envelope)) 
     if len(outputs) == 0: 
      warnings.warn(
       "Task %r without outputs has no custom complete() method" % self, 
       stacklevel=2 
      ) 
      return False 

     return all(map(lambda output: output.exists(), outputs)) 

    def run(self): 

     result, outparams = self.my_run() 

     envelope = Envelope(result, **outparams) 

     with self.output(envelope).open('w') as fout: 
      pickle.dump(envelope, fout) 

結果EnvelopedSumタスクは、その後かなり小さなだろうを設定します。

注:luigi-task-resultsをエンベロープする方法のこの実装例は、安定していないため、結果とパラメータをエンベロープで表したものです。

私の質問はです:luigiの複雑なパラメータを扱う簡単な方法はありませんか?

フォローアップの質問:パラメータ検索が実行されたコードバージョン(および/またはサブバージョンのパッケージバージョン)の記録を保持することを考えている人はいますか?

このトピックのどこから読んだかに関するコメントもありがとうございます。

注:

あなたはおそらく、このランニングをするために、いくつかの輸入が必要になります。

from luigi.task import flatten 
import warnings 
import pickle 

答えて

1

あなたはメーリングリストで提案のこの種に良好な応答を取得する可能性があります。 Luigiのタスクコードは、すでに取得できるユニークなタスクIDを生成するために、パラメータのMD5ハッシュを生成しています。

https://github.com/spotify/luigi/blob/master/luigi/task.py#L79-L82

# task_id is a concatenation of task family, the first values of the first 3 parameters 
# sorted by parameter name and a md5hash of the family/parameters as a cananocalised json. 
param_str = json.dumps(params, separators=(',', ':'), sort_keys=True) 
param_hash = hashlib.md5(param_str.encode('utf-8')).hexdigest() 
関連する問題