mysql
  • python-3.x
  • airflow
  • 2017-10-10 17 views 5 likes 
    5
    def mysql_operator_test(): 
        DEFAULT_DATE = datetime(2017, 10, 9) 
        t = MySqlOperator(
         task_id='basic_mysql', 
         sql="SELECT count(*) from table 1 where id>100;", 
         mysql_conn_id='mysql_default', 
         dag=dag) 
        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False) 
    
    run_this = PythonOperator(
        task_id='getRecoReq', 
        python_callable=mysql_operator_test, 
        # xcom_push=True, 
        dag=dag) 
    
    task2 = PythonOperator(
        task_id= 'mysql_select', 
        provide_context=True, 
        python_callable = blah, 
        templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" }, 
        dag=dag) 
    
    run_this.set_downstream(task2) 
    

    xcomsを使用してMySqlOperatorから返されたカウントを取得します。誰かが同じことを指導してもらえますか?MySqlOperatorでairflow xcomsを使用する方法

    答えて

    1

    あなたは非常に近いです!しかし、あなたがこの質問をしている様子は、ある種の反パターンです。 Airflowでタスク間でデータを共有したくない場合。また、あなたはmysql_operator_testのようにオペレーターを使用したくありません。それは魅力的です、私が始めていたときに私は同じことをしました。

    私はこれに非常に似たSFTP接続を試しました。私は結局、PythonOperatorの中のすべてをやっただけで、基礎をなすフックを使用しました。

    MySQLHookpython_callableの内側に使用することをおすすめします。このような何か:

    def count_mysql_and_then_use_the_count(): 
        """ 
        Returns an SFTP connection created using the SSHHook 
        """ 
        mysql_hook = MySQLHook(...) 
        cur = conn.cursor() 
        cur.execute("""SELECT count(*) from table 1 where id>100""") 
        for count in cur: 
         # Do something with the count... 
    

    私はこれがあるとして動作するかはわからないが、アイデアは呼び出し可能なあなたのPythonの内側にフックを使うで、私はしばしばMySQLHookを使用しませんが、私はこれをしませんでしたSSHHookとそれは素晴らしい仕事をしています。

    関連する問題