多くの私のプロジェクトでは、パイプラインツールとしてluigiを使用しています。これは、私がそれを使ってパラメータ検索を実装することを考えさせました。luigiの多くのパラメータを扱う
def output(self):
return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)
つまり、パラメータはファイル名で保存されます:標準luigi.file.LocalTarget
もドキュメントの例で示されているパラメータに対処するための非常に単純なアプローチを持っています。これにより、特定のパラメータの組み合わせがすでに計算されているかどうかを簡単に確認できます。これは、タスクのパラメータがより複雑になるとすぐに乱雑になります。ここで
は、パラメータ検索の非常にシンプルなアイデアです:
この例では、4つのすべてのパラメータがファイル名にエンコードすることができますが、それはファンタシーの多くを必要としない、そのimport luigi
class Sum(luigi.Task):
long_ = luigi.Parameter()
list_ = luigi.Parameter()
of = luigi.Parameter()
parameters = luigi.Parameter()
def output(self):
return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_,
self.list_,
self.of,
self.parameters))
def run(self):
sum_ = self.long_ + self.list_ + self.of + self.parameters
with self.output().open('w') as out_file:
out_file.write(str(sum_))
class ParameterSearch(luigi.Task):
def requires(self):
list_of_parameter_combinations = [
{
"long_" : 1,
"list_" : 2,
"of" : 3,
"parameters" : 4
},{
"long_" : 5,
"list_" : 6,
"of" : 7,
"parameters" : 8
}
]
for pc in list_of_parameter_combinations:
yield Sum(**pc)
確かに、このアプローチは境界に到達することができます。たとえば、配列のようなパラメータを考えてみましょう。
私は、パラメータと結果をある種の封筒オブジェクトに保存し、それをターゲットとして保存できるようにしました。 ファイル名は、最初のファジー検索のパラメータの何らかの種類のハッシュになります。
封筒のクラスがあり
class Envelope(object):
@classmethod
def hashify(cls, params):
return hash(frozenset(params.items()))
def __init__(self, result, **params):
self.params = {}
for k in params:
self.params[k] = params.get(k)
def hash(self):
return Envelope.hashify(self.params)
その後、LocalTargetを高め、エンベロープ内の全てのパラメータが一致しているかどうかを確認することができ、新たな目標があり、:
class EnvelopedTarget(luigi.file.LocalTarget):
fs = luigi.file.LocalFileSystem()
def __init__(self, params, path=None, format=None, is_tmp=False):
self.path = path
self.params = params
if format is None:
format = luigi.file.get_default_format()
if not path:
if not is_tmp:
raise Exception('path or is_tmp must be set')
path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999))
super(EnvelopedTarget, self).__init__(path)
self.format = format
self.is_tmp = is_tmp
def exists(self):
path = self.path
if '*' in path or '?' in path or '[' in path or '{' in path:
logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; "
"override exists() to suppress the warning.", path)
if self.fs.exists(path):
with self.open() as fin:
envelope = pickle.load(fin)
try:
assert len(envelope.params) == len(self.params)
for param,paramval in self.params.items():
assert paramval == envelope.params.get(param)
except(AssertionError):
return False
return True
else:
return False
ここでの問題は、このターゲットを使用すると、もともとルイジが最小限に抑えることを目指す定型文が追加されることです。このタスクは、初めにSum
タスクと同じ方法で実行することができます
class EnvelopedSum(BaseTask):
long_ = luigi.Parameter()
list_ = luigi.Parameter()
of = luigi.Parameter()
parameters = luigi.Parameter()
def my_run(self):
return sum(self.param_kwargs.values()), self.param_kwargs
:私は新しい基本タスク
class BaseTask(luigi.Task):
def output(self, envelope):
path = '{}{}.txt'.format(type(self).__name__, envelope.hash())
params = envelope.params
return EnvelopedTarget(params, path=path)
def complete(self):
envelope = Envelope(None, **self.param_kwargs)
outputs = flatten(self.output(envelope))
if len(outputs) == 0:
warnings.warn(
"Task %r without outputs has no custom complete() method" % self,
stacklevel=2
)
return False
return all(map(lambda output: output.exists(), outputs))
def run(self):
result, outparams = self.my_run()
envelope = Envelope(result, **outparams)
with self.output(envelope).open('w') as fout:
pickle.dump(envelope, fout)
結果EnvelopedSum
タスクは、その後かなり小さなだろうを設定します。
注:luigi-task-resultsをエンベロープする方法のこの実装例は、安定していないため、結果とパラメータをエンベロープで表したものです。
私の質問はです:luigiの複雑なパラメータを扱う簡単な方法はありませんか?
フォローアップの質問:パラメータ検索が実行されたコードバージョン(および/またはサブバージョンのパッケージバージョン)の記録を保持することを考えている人はいますか?
このトピックのどこから読んだかに関するコメントもありがとうございます。
注:
あなたはおそらく、このランニングをするために、いくつかの輸入が必要になります。
from luigi.task import flatten
import warnings
import pickle