2016-06-26 13 views
2

エアフローを設定し、いくつかの演算子を呼び出すDagsとsubDagを作成しました。オペレータの実行後にエアフローが発生する

私の問題は、オペレータがジョブを実行して終了したときに、結果をいくつかのPython構造体に戻したいということです。例えば :

File1.py

... 
    ... 
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path, 
     ), 
     task_id=DELP_DAG_NAME, 
     dag=dag, 
    ) 

File2.py

from airflow import DAG 
    from airflow.operators import HiveOperator 
def subdag_callHive(parent, child, args, step, 
         user_defined_macros, path 
         ): 
     dag_subdag = DAG(
      dag_id='%s.%s' % (parent, child), 
      default_args=args, 
      schedule_interval="@daily", 
      template_searchpath=path, 
      user_defined_macros=user_defined_macros, 
     ) 

     # some work... 

     HiveOperator(
      task_id='some_id', 
      hiveconf_jinja_translate=True, 
      hql='select field1 from public.mytable limit 4;', 
      trigger_rule='all_done', 
      dag=dag_subdag, 
     ) 

     return dag_subdag 
subdag_callHiveメインDAGは定義されている別のPythonスクリプトから呼び出され

機能と他のすべてのパラメータが必要です。

この場合、HiveOperator (* select * from public.mytable limit 4; *)の結果を得る必要があります。

返さdag_subdagは、オブジェクト<クラスのairflow.models.DAG '>で、コールが、HiveOperatorが何をしたかについての情報なしに渡されるすべての属性/データが含まれています。

これは可能ですか?だから、どうすれば達成できるのか?

答えて

4

必要に応じてフックを使用できます。基本的にHiveOperatorは同じことをしますが、Hive Hooksには結果を扱う複数のメソッドがあります。

PythonOperatorを使用して関数を呼び出し、ハイブフックを開始します。

次の例は役に立ちます。

コードスニペット:

callHook = PythonOperator(
    task_id='foo', 
    python_callable=do_work, 
    dag=dag 
) 

def do_work(): 
    hiveserver = HiveServer2Hook() 
    hql = "SELECT COUNT(*) FROM foo.bar" 
    row_count = hiveserver.get_records(hql, schema='foo') 
    print row_count[0][0] 

すべての利用可能な方法は、ここで見つけることができます:https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py

関連する問題