2017-10-28 5 views
1

ダウンロードする500のリンクがあり、たとえば10個のアイテムをバッチしたいと思います。luigiバッチモジュールで使用されているストレートバッチタスク

この疑似コードはどのようになりますか?

class BatchJobTask(luigi.Task) 
    items = luigi.Parameter() 
    def run(self): 
     listURLs = [] 
     with ('urls_chunk', 'r') as urls 
      for line in urls: 
       listURLs.append('http://ggg'+line+'.org') 
      10_urls = listURLs[0:items] #10 items here 
      for i in 10_urls: 
       req = request.get(url) 
       req.contents 
    def output(self): 
     return self.LocalTarger("downloaded_filelist.txt") 

class BatchWorker(luigi.Task) 
    def run(self) 
     # Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc... 

どのようになりますか?

+0

あなたのURLのリストはどこですか? – MattMcKnight

+0

私は最初の投稿 – GarfieldCat

+0

を更新しました。このURLのリストはどこに保存されていましたか?キュー内、データベース内、ファイル内?あなたがしなければならないことは、その中に何人がいるのかを把握し、そこからあなたのチャンクを構築することです。以下に例を示しますが、問題の関連部分を指定していないため、問題に関連する可能性は低いです。 – MattMcKnight

答えて

1

ここでは、ファイル内に別々の行として格納されている文字列のリストを使用して、必要なものを実行する方法を示します。

import luigi 
import requests 

BATCH_SIZE = 10 


class BatchProcessor(luigi.Task): 
    items = luigi.ListParameter() 
    max = luigi.IntParameter() 

    def requires(self): 
     return None 

    def output(self): 
     return luigi.LocalTarget('processed'+str(max)+'.txt') 

    def run(self): 
     for item in self.items: 
      req = requests.get('http://www.'+item+'.org') 
      # do something useful here 
      req.contents 
     open("processed"+str(max)+".txt",'w').close() 


class BatchCreator(luigi.Task): 
    file_with_urls = luigi.Parameter() 

    def requires(self): 
     required_tasks = [] 
     f = open(self.file_with_urls) 
     batch_index = 0 
     total_index = 0 
     lines = [] 
     while True: 
      line = f.readline() 
      if not line: break 
      total_index += 1 
      if batch_index < BATCH_SIZE: 
       lines.append(line) 
       batch_index += 1 
      else: 
       required_tasks.append(BatchProcessor(batch=lines)) 
       lines = [line] 
       batch_index = 1 
     return required_tasks 

    def output(self): 
     return luigi.LocalTarget(str(self.file_with_urls) + 'processed') 

    def run(self): 
     open(str(self.file_with_urls) + 'processed', 'w').close() 
1

これを行いました。

class GetListtask(luigi.Task) 
    def run(self): 
     ... 
    def output(self): 
    return luigi.LocalTarget(self.outputfile) 

class GetJustOneFile(luigi.Task): 
    fid = luigi.IntParameter() 
    def requires(self): 
     pass 

    def run(self): 
     url = 'http://my-server.com/test' + str(self.fid) + '.txt' 
     download_file = requests.get(url, stream=True) 
     with self.output().open('w') as downloaded_file: 
      downloaded_file.write(str(download_file.content)) 

    def output(self): 
     return luigi.LocalTarget("test{}.txt".format(self.fid)) 


class GetAllFiles(luigi.WrapperTask): 
    def requires(self): 
     listoffiles = [] # 0..999 
     for i in range(899): 
      listoffiles.append(i) 
     return [GetJustOneFile(fid=fileid) for fileid in listoffiles] 

このコードはひどいですか?

+0

まあ、バッチ処理はしませんが、うまくいくはずです。 – MattMcKnight

+0

定義済みリストの代わりにGetAllFilesのGetListTaskからファイルを入力するにはどうすればよいですか? – GarfieldCat

+0

これは私の 'BatchCreator'タスクの' requires'メソッドで示したものです。ファイルの各行が変化するURNコンポーネントであるファイルがあると仮定します。 – MattMcKnight

関連する問題