2017-05-26 25 views
0

気流が自分のニーズに適しているかどうかを評価しています(バイオインフォマティクス)。私は気流モデルでいくつかの問題を抱えています。具体的には:特定のディレクトリで手動でAirflow DAGを実行する方法

  • DAGファイルは実際にどこで実行されますか?そのコンテキストは何ですか?入力データをDAG定義ファイルに渡すにはどうすればよいですか? (たとえば、ディレクトリ内の各ファイルのタスクを作成しない)
  • アドホックベースでDAGを実行するにはどうすればよいですか。 DAG構築のパラメータをどのように渡すのですか?

ここに私が実行したいものの例があります。私はちょうどいくつかの共有ファイルシステムで利用可能な20のファイルを含むディレクトリとしていくつかのデータを受け取ったとします。私は、20個のファイルのそれぞれに対して特定のbashコマンドを実行し、結果の一部を結合し、さらに処理を実行するDAGパイプラインを実行したい。 DAGはファイルシステム上のパスを必要とし、ディレクトリ内のファイルをリストして、それぞれのタスクを構築します。

DAGの全面的な前提を動的に構築できるのであれば、あるタスクから別のタスクにメタデータを渡す必要はありません(私が理解できるのはXComです)。しかし、どのように私がDAG構築への道を渡すことができるかは私には分かりません。

別の言い方をすれば、私は

dag = DAG(...) 
for file in glob(input_path): 
    t = BashOperator(..., dag=dag) 

のようなものが含まれるように、私のDAGの定義が欲しいどのように私は私が手動でDAGをトリガしたい場合input_pathが渡されたのですか?

私はまた、実際にはcronスタイルのスケジューリングの必要はありません。

+0

ルイージの類似機能は、CLIでパラメータを指定することです –

答えて

0

input_pathについては、気流変数を使用してDAGに渡すことができます。

input_path = Variable.get("INPUT_PATH")

変数UIを通る空気流CLIまたは手動を使用してインポートすることができる:DAGファイルで使用されるコードの例。

あなたはこのタイプのロジックのためsubdagを使用する必要があります。

dag = DAG(...) for file in glob(input_path): t = BashOperator(..., dag=dag)

SubDAGsは、パターンを繰り返すに最適です。 DAGオブジェクトを返す関数を定義することは、気流を使用するときの素晴らしいデザインパターンです。

関連する問題