1
A
答えて
1
PythonOperator
を使用してファイルを行単位で処理することをお勧めします。私は、私がポーリングを行い、ファイルのSFTPサーバーを使用するユースケースを持っています。見つけたときに、私はそれらを行ごとに処理し、結果をJSONとして書き出します。私はこのような何かがあなたのために働く可能性があるなど、YYYY-MM-DD形式にパース日付のようなものを実行します。
def csv_file_to_kafka(**context):
f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
は今、それはあなたがファイルをダウンロードするにはなるだろうどのようにあなた次第本当にです。私の場合、SSHHook
とGoogleCloudStorageHook
を使ってSFTPサーバーからファイルを取得し、ファイルの名前をcsvファイルの解析とクリーンアップを行うタスクに渡します。私はSFTPからファイルをプルダウンし、Googleのクラウドストレージにそれらを置くことによって、この操作を行います。
"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')
"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []
for f in all_files:
files.append(f)
return files
def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')
sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []
new_files = process_sftp_files(files, gcs, sftp_client)
return new_files
def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
私はすべての呼び出し可能な一つの大きなpythonでこれを行うことが知っている、それは内なるように、私は今、コードをリファクタリングしています方法です呼び出し可能です。したがって、SFTPサーバーをポーリングし、最新のファイルを取得し、私のルールに従って単一のPython関数で解析します。私は、XComを使用することは理想的ではないと聞いていますが、Airflowタスクはあまりにも多くの、おそらく互いに通信することは想定されていません。
あなたのユースケースに応じて、Apache Nifiのようなものを探索したいと思うかもしれませんが、私は実際にそれを今も見ています。
関連する問題
- 1. KafkaでSpring Cloud Dataflowを使用してCSVファイルをストリーム
- 2. Dockerを使用してDC/OSで気流を流す
- 3. kafkaストリームを使用してJSON値を使用するkafkaストリームでjson apiを接続する:JAVA
- 4. Kafkaストリームを使用したワーク配布
- 5. 気流UIを使用してDAG全体を実行
- 6. Kafkaストリームを使用してSeqを抽出する
- 7. ドッカー内の気流がドッカーコンテナを使用しています
- 8. ストリームDSLを使用したKafkaストリームのストリーム/テーブルタイムスタンプ抽出器ごとの指定
- 9. 気流を使用してPostgres/MySQLからS3にデータを移動
- 10. 気流とドッキング用コンテナ
- 11. JSON用のCreatin kafkaストリームAPI
- 12. 大気を使用してガベージ文字としてストリームされたUnicode文字
- 13. 気流pythonクライアント
- 14. 気流Pythonユニットテスト?
- 15. カスタム変換にKafkaストリームを使用する
- 16. Bluemix Kafkaストリーム
- 17. kafka-streamsを使用してjson入力ストリームを条件付きでソートする
- 18. 気流DBセッションが気になる環境を提供していない
- 19. 気流XCOM KeyError: 'task_instance'
- 20. 気流をセロリーで使用する方法
- 21. 春の雲ストリームkafka
- 22. ストリームを使用してファイルをロードして処理する
- 23. 出力ストリームを使用してGoogleドライブにファイルをアップロード
- 24. iTextSharpを使用してPDFファイルを出力ストリームにマージする
- 25. Pythonを使用してファイルをストリームにパイプする方法
- 26. 気流パラメータを従属タスクに渡す
- 27. 空気流のログ用にs3を設定してください
- 28. ストリームを使用してファイルに保存するC++エラー
- 29. 直接ストリームを使用したKafka Sparkストリーミングでコンシューマグループを指定する方法
- 30. Kafkaはキーでストリームをストリームに結合する
あなたは本当にファイルを_streaming_していますか、それともそれらをバッチしていますか?気流は実際にはバッチ処理やマイクロバッチ処理をサポートしていますが、ストリーミングではそれほど大きくなく、基本的に_nano_-バッチ処理のように機能します。私はリモートホスト上のCSVファイルのために多くのポーリングを行い、それらをバッチとしてBigQueryに取り込みます。 – Mike
私はそれらを1行ずつ処理し、各行をkafkaに送ります。 – bsd