2017-05-18 17 views
1

以下のスキーマで説明されているように、気流で条件付きタスクを作成したいと思います。気流:避けるタスク状態=スキップ

  • タスク1:タスク1が成功した場合Start_clusterは、
  • を実行タスク2が成功した場合、タスク2は、すべてのタスクが成功した場合、タスク3
  • を実行し、
  • を実行または予想されるシナリオは以下のとおりです。一つのタスクが失敗し、[タスク4を実行します。

enter image description here

terminate_cluster私が試しました: all_doneあまりに

trigger_rule=TriggerRule.ALL_DONE 

とスキップされた状態で

trigger_rule=TriggerRule.ONE_FAILED 

タスク4滞在は、私はこのソリューションfoud:How to create a conditional task in Airflowを、それは私のために動作しません。

答えて

1

以前のタスクが成功したかどうかにかかわらず、クラスタを終了したいと思うので、ALL_DONEが適切に聞こえます。 Start_Clusterは別として。それが失敗すると、終了するクラスタがないかもしれませんが、場合によってはチェック/試してみる必要があります。

デフォルトのtrigger_ruleはALL_SUCCESSです。たとえば、タスク1が失敗した場合、タスク2はタスク1が正常に実行されるため、Dag全体が失敗します。

タスクが失敗する可能性がありますが、それでもクラスタを終了したい場合は、ダグを追跡するための代替パスが必要になります。 PythonBranchOperatorを使用し、Pythonコールバック関数を使用します。

もう1つの方法は、trigger_ruleが「ONE_FAILURE」で実行されているダミー演算子を使用してから、「Terminate Cluster」タスクを実行することです。

あなたはダミータスク「Task_Failureを」という名前の場合たとえば、これは、依存関係のチェーンのようになります。そのシナリオで

Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster 
Task_2 >> Task_Failure 
Task_3 >> Task_Failure 
Task_Failure >> Terminate_Cluster 

、Task_Failureだろう、おそらくそれはいくつかの可能ですので、ONE_SUCCESSにTerminate_Clusterのtrigger_ruleを設定する必要がありますタスクは決して実行されません。最後のタスクをALL_DONEに設定し、以前のタスクのいくつかにステータスがない場合は、ハングアップするか失敗する可能性があります。

ALL_DONEとALL_SUCCESSの違い:https://stackoverflow.com/a/47716981/1335793

関連する問題