2017-05-04 12 views
1

今日、私が書いた機械学習の仕事は、手作業で行われています。私は必要な入力ファイルをダウンロードし、学習し、予測し、.csvファイルを出力し、それをデータベースにコピーします。Luigiを使用した反復機械学習ETL

しかし、これが生産に入っているので、私はこのプロセスをすべて自動化する必要があります。必要な入力ファイルは毎月(そして最終的にはより頻繁に)プロバイダからのS3バケットに届きます。

私はこの問題を解決するためにLuigiを使用する予定です。ここでは理想的なプロセスです:

  • 毎週私は新しいファイル
  • ファイルが到着
  • 、私の機械学習のためのS3バケットを見るために私のプログラムを必要とする(または日、または時間、私が感じるものは何でも良いです)パイプラインが発射され、いくつかのパンダのデータフレームが吐き出される。その後
  • 、私は別のDB

に、これらの結果を書き込むための私のプログラムを必要とする問題は、私は自動化するためにルイージを使用する方法がわからない、です:

  • を見

    1. ファイルスケジュールタスク(毎月用など)
    2. は、(再現可能な方法で)それを展開

    今日は、ここで私が考えているパイプラインのスケルトンは次のとおりです。

    import luigi 
    
    from mylib import ml_algorithm 
    from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done 
    
    class Extract(luigi.Task): 
        date = luigi.DateParameter() 
        s3_path = luigi.Parameter() 
        filename = luigi.Parameter() 
        def requires(self): 
         pass 
        def output(self, filename): 
         luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename) 
        def run(self): 
         data = read_s3(s3_path + '/' + file) 
         with self.output.open('w') as hdfs_file: 
          write_hdfs(hdfs_file, data) 
    
    
    class Transform(luigi.Task): 
        date = luigi.DateParameter() 
        s3_path = luigi.Parameter() 
        filename = luigi.Parameter() 
        def requires(self): 
         return Extract(self.date, self.s3_path, self.filename) 
        def output(self, filename): 
         luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename) 
        def run(self): 
         with self.input().open('r') as inputfile: 
          data = read_hdfs(inputfile) 
         result = ml_algorithm(data) 
         with self.output().open('w') as outputfile: 
          write_hdfs(outputfile, result) 
         mark_as_done(filename) 
    
    
    
    class Load(luigi.Task): 
        date = luigi.DateParameter() 
        s3_path = luigi.Parameter() 
        def requires(self): 
         return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)] 
        def output(self): 
         # Fake DB target, just for illustrative purpose 
         luigi.hdfs.DBTarget('...') 
        def run(self): 
         for input in self.input(): 
          with input.open('r') as inputfile: 
           result = read_hdfs(inputfile) 
          # again, just for didatic purposes 
          db = self.output().connection 
          write_db(db, result) 
    

    その後、私はのcrontabと単純にドッカー容器にラップするためにこれを追加します。

    質問:

    • これは、人々がこれを行うために使用する正しいパターンですか?それを行うより良い方法はありますか?
    • 入力データに依存するTransform1Transform1の結果に依存)とTransform2があり、両方の結果を別のDBに保存したい場合は、どのようにルイジパイプラインを使用してこれを実装できますかファイル)?
    • 人々はこれにcron以外の何かを使用していますか?
    • これを正しくコンテナ化するにはどうすればよいですか?
  • +0

    多段階ETLの例は次のとおりです。入力データはユーザー表です。各ユーザーにはいくつかの機能があり、最初の変換は欠損値を含む列を埋めることができ、2番目の変換はクラスタリングそのユーザーの私は塗りつぶされたテーブルとクラスターの両方を保存したいと思います。 – prcastro

    答えて

    2

    あなたのパターンは大きく見えます。まず、cronジョブを使用して、Loadタスクパイプラインをトリガーするスクリプトを呼び出すことから始めます。このLoadタスクは既にS3バケット内に新しいファイルの存在を確認していますが、出力ファイルを条件ファイルに変更する必要があります。新しいファイルがある場合にのみ、Loadタスクが必要だったより高いレベルのWrapperTask(出力なし)でこれを実行することもできます。次に、このWrapperTaskを使用して、2つの異なる負荷タスクが必要で、それぞれTransform1Transform2が必要になります。

    コンテナに追加...私のcronが実際に呼び出すものは、gitから最新のコードを取得し、必要に応じて新しいコンテナを作成し、その後ドッカーを実行するスクリプトです。私はいつも稼働している別の容器をluigidに持っています。毎日のドッカー実行は、その日に必要なパラメータでluigiタスクを呼び出すCMDを使用してコンテナ内のシェルスクリプトを実行します。

    +0

    しかし、Transform1がTransform2に依存する場合はどうなりますか?このようにして、両方とも呼び出すために単一のラッパータスクを使用することはできません。条件付きで出力を変更する方法もわかりません。 – prcastro

    +0

    Transform1の 'requires()'にTransform2を追加できます。たとえWrapperTaskが両方を必要とするとしても、luigiは正しいグラフを見つけ出すでしょう。出力を条件付きにするのはちょっとハッキリです。おそらくWrapperTaskを持っているほうがいいでしょう。 – MattMcKnight