2017-09-22 8 views
4

私は私が進むべき道がわからないよエアフローを使用してmysqlレコードを取得して処理する方法は?

1. run a select query on MYSQL DB and fetch the records.    
2. Records are processed by python script. 

する必要があります。 xcomはここに行く道ですか?また、MYSQLOperatorはクエリを実行するだけで、レコードをフェッチしません。私が使うことができるinbuilt転送演算子はありますか?ここでMYSQLフックを使うには?

あなたは、データを取得するためのフックを使用していますPythonOperatorを使用 は、変換を適用し、バックいくつかの他の場所を(今決めた)行を出荷することをお勧めします。

誰かが同じことを進める方法を説明することはできますか。

参照してください - http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work(): 
    mysqlserver = MySqlHook(connection_id) 
    sql = "SELECT * from table where col > 100 " 
    row_count = mysqlserver.get_records(sql, schema='testdb') 
    print row_count[0][0] 

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb', 
    python_callable=mysqlHook, 
    dag=dag 
) 

これは続行するための正しい方法ですが? また、xcomsを使用して、次のMySqlOperatorのレコードを保存する方法はありますか?

t = MySqlOperator(
conn_id='mysql_default', 
task_id='basic_mysql', 
sql="SELECT count(*) from table1 where id > 10", 
dag=dag) 
+1

代わりMySqlHookを使用しての、あなたはXCOMにフィットするようには思えない大量のデータを取得したいので、 PythonOperatorで独自の関数を作成し、sql接続とクエリを手助けするために 'sqlalchemy'を使用できますか? – Chengzhi

+0

@Chengzhiこれは現在私がやっていることですが、その後Airflowを使う目的を破ってしまいます。他の回避策はありますか? – gpk27

+0

私はこれが気流を使う目的を破るとは思わない。物事に関する演算子演算子(MySQL演算子はMySQLデータベース上で動作します)。 Pythonでデータベースの各レコードの演算子を使用する場合は、 'PythonOperator'を使用する必要があります。私は、sqlalchemyのような低レベルのパッケージを使用する大規模なPythonスクリプトを作成するのを恐れません。時には、それはあなたが行う必要がある場合は、単に最高のことです。あなたの問題は、あなたが本当に1つのタスクであるものから2つのタスクを作ろうとしていることだと思います。 – Mike

答えて

関連する問題