今日、私が書いた機械学習の仕事は、手作業で行われています。私は必要な入力ファイルをダウンロードし、学習し、予測し、.csvファイルを出力し、それをデータベースにコピーします。Luigiを使用した反復機械学習ETL
しかし、これが生産に入っているので、私はこのプロセスをすべて自動化する必要があります。必要な入力ファイルは毎月(そして最終的にはより頻繁に)プロバイダからのS3バケットに届きます。
私はこの問題を解決するためにLuigiを使用する予定です。ここでは理想的なプロセスです:
- 毎週私は新しいファイル
- ファイルが到着 、私の機械学習のためのS3バケットを見るために私のプログラムを必要とする(または日、または時間、私が感じるものは何でも良いです)パイプラインが発射され、いくつかのパンダのデータフレームが吐き出される。その後
- 、私は別のDB
に、これらの結果を書き込むための私のプログラムを必要とする問題は、私は自動化するためにルイージを使用する方法がわからない、です:
- ファイルスケジュールタスク(毎月用など)
- は、(再現可能な方法で)それを展開
今日は、ここで私が考えているパイプラインのスケルトンは次のとおりです。
import luigi
from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done
class Extract(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
pass
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
def run(self):
data = read_s3(s3_path + '/' + file)
with self.output.open('w') as hdfs_file:
write_hdfs(hdfs_file, data)
class Transform(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return Extract(self.date, self.s3_path, self.filename)
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
def run(self):
with self.input().open('r') as inputfile:
data = read_hdfs(inputfile)
result = ml_algorithm(data)
with self.output().open('w') as outputfile:
write_hdfs(outputfile, result)
mark_as_done(filename)
class Load(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
def requires(self):
return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
def output(self):
# Fake DB target, just for illustrative purpose
luigi.hdfs.DBTarget('...')
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = read_hdfs(inputfile)
# again, just for didatic purposes
db = self.output().connection
write_db(db, result)
その後、私はのcrontabと単純にドッカー容器にラップするためにこれを追加します。
質問:
- これは、人々がこれを行うために使用する正しいパターンですか?それを行うより良い方法はありますか?
- 入力データに依存する
Transform1
(Transform1
の結果に依存)とTransform2
があり、両方の結果を別のDBに保存したい場合は、どのようにルイジパイプラインを使用してこれを実装できますかファイル)? - 人々はこれにcron以外の何かを使用していますか?
- これを正しくコンテナ化するにはどうすればよいですか?
多段階ETLの例は次のとおりです。入力データはユーザー表です。各ユーザーにはいくつかの機能があり、最初の変換は欠損値を含む列を埋めることができ、2番目の変換はクラスタリングそのユーザーの私は塗りつぶされたテーブルとクラスターの両方を保存したいと思います。 – prcastro