私は私が進むべき道がわからないよエアフローを使用してmysqlレコードを取得して処理する方法は?
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
する必要があります。 xcomはここに行く道ですか?また、MYSQLOperatorはクエリを実行するだけで、レコードをフェッチしません。私が使うことができるinbuilt転送演算子はありますか?ここでMYSQLフックを使うには?
あなたは、データを取得するためのフックを使用していますPythonOperatorを使用 は、変換を適用し、バックいくつかの他の場所を(今決めた)行を出荷することをお勧めします。
誰かが同じことを進める方法を説明することはできますか。
参照してください - http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
これは続行するための正しい方法ですが? また、xcomsを使用して、次のMySqlOperatorのレコードを保存する方法はありますか?
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
代わりMySqlHookを使用しての、あなたはXCOMにフィットするようには思えない大量のデータを取得したいので、 PythonOperatorで独自の関数を作成し、sql接続とクエリを手助けするために 'sqlalchemy'を使用できますか? – Chengzhi
@Chengzhiこれは現在私がやっていることですが、その後Airflowを使う目的を破ってしまいます。他の回避策はありますか? – gpk27
私はこれが気流を使う目的を破るとは思わない。物事に関する演算子演算子(MySQL演算子はMySQLデータベース上で動作します)。 Pythonでデータベースの各レコードの演算子を使用する場合は、 'PythonOperator'を使用する必要があります。私は、sqlalchemyのような低レベルのパッケージを使用する大規模なPythonスクリプトを作成するのを恐れません。時には、それはあなたが行う必要がある場合は、単に最高のことです。あなたの問題は、あなたが本当に1つのタスクであるものから2つのタスクを作ろうとしていることだと思います。 – Mike