2005年から2010年までの日付ごとに1つのピクルファイルがあります。各ファイルには、その日付のそれぞれの頻度の単語の辞書が含まれています。私はまた、全期間にわたってすべてのユニークな単語を持つ "マスターファイル"を持っています。合計で約500万語があります。複数の入力を持つluigiタスクのアーキテクチャ
私はすべてのデータを取り、単語ごとに1つのCSVファイルを作成する必要があります。これは日付ごとに1つの行を持ちます。例えば、サンプルファイルsome_word.txt
のために:
2005-01-01,0.0003
2005-01-02,0.00034
2005-01-03,0.008
私はトラブルルイージフレームワークと、このプロセスを組織を抱えています。私の現在のトップレベルのタスクは単語を取り、日付ごとに関連する頻度を調べ、その結果をCSVファイルに保存します。私はマスターファイル内のすべての単語をループしてその単語でタスクを実行することができますが、それ以上の時間がかからなければ数ヶ月かかると推定しています。私のトップレベルのAggregateTokenFreqs
タスクは簡略版です。
class AggregateTokenFreqs(luigi.Task):
word = luigi.Parameter()
def requires(self):
pass # not sure what to require here, master file?
def output(self):
return luigi.LocalTarget('data/{}.csv'.format(self.word))
def run(self):
results = []
for date_ in some_list_of_dates:
with open('pickles/{}.p'.format(date_), 'rb') as f:
freqs = pickle.load(f)
results.append((date_, freqs.get(self.word))
# Write results list to output CSV file
実行する必要がある処理は何ですか?たとえば、新しい日のデータが到着したときに毎日のプロセスを再実行する計画ですか?あなたが一度だけ実行する必要がある場合は、おそらくluigiを実行する意味がありません。いずれにせよ、マルチプロセッシングを使う方が良いでしょう。 – MattMcKnight