2017-10-16 8 views
1

気流を使用してCSVファイルをカフカのトピックにストリームする最良の方法は何ですか?気流を使用してkafkaにファイルをストリーム

エアフロー用のカスタムオペレータの作成?

+0

あなたは本当にファイルを_streaming_していますか、それともそれらをバッチしていますか?気流は実際にはバッチ処理やマイクロバッチ処理をサポートしていますが、ストリーミングではそれほど大きくなく、基本的に_nano_-バッチ処理のように機能します。私はリモートホスト上のCSVファイルのために多くのポーリングを行い、それらをバッチとしてBigQueryに取り込みます。 – Mike

+0

私はそれらを1行ずつ処理し、各行をkafkaに送ります。 – bsd

答えて

1

PythonOperatorを使用してファイルを行単位で処理することをお勧めします。私は、私がポーリングを行い、ファイルのSFTPサーバーを使用するユースケースを持っています。見つけたときに、私はそれらを行ごとに処理し、結果をJSONとして書き出します。私はこのような何かがあなたのために働く可能性があるなど、YYYY-MM-DD形式にパース日付のようなものを実行します。

def csv_file_to_kafka(**context): 

    f = '/path/to/downloaded/csv_file.csv' 
    csvfile = open(f, 'r') 
    reader = csv.DictReader(csvfile) 

    for row in reader: 
     """ 
     Send the row to Kafka 
     """ 
    return 

csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

は今、それはあなたがファイルをダウンロードするにはなるだろうどのようにあなた次第本当にです。私の場合、SSHHookGoogleCloudStorageHookを使ってSFTPサーバーからファイルを取得し、ファイルの名前をcsvファイルの解析とクリーンアップを行うタスクに渡します。私はSFTPからファイルをプルダウンし、Googleのクラウドストレージにそれらを置くことによって、この操作を行います。

""" 
HOOKS: Connections to external systems 
""" 
def sftp_connection(): 
    """ 
    Returns an SFTP connection created using the SSHHook 
    """ 
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection') 
    ssh_client = ssh_hook.get_conn() 
    return ssh_client.open_sftp() 
def gcs_connection(): 
    """ 
    Returns an GCP connection created using the GoogleCloudStorageHook 
    """ 
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection') 

""" 
PYTHON CALLABLES: Called by PythonOperators 
""" 
def get_files(**context): 
    """ 
    Looks at all files on the FTP server and returns a list files. 
    """ 
    sftp_client = sftp_connection() 
    all_files = sftp_client.listdir('/path/to/files/') 
    files = [] 

    for f in all_files: 
     files.append(f) 

    return files 

def save_files(**context): 
    """ 
    Looks to see if a file already exists in GCS. If not, the file is downloaed 
    from SFTP server and uploaded to GCS. A list of 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='get_files') 

    sftp_client = sftp_connection() 
    gcs = gcs_connection() 
    new_files = [] 
    new_outcomes_files = [] 
    new_si_files = [] 

    new_files = process_sftp_files(files, gcs, sftp_client) 

    return new_files 

def csv_file_to_kafka(**context): 
    """ 
    Untested sample parse csv files and send to kafka 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='save_files') 
    for f in new_files: 
     csvfile = open(f, 'r') 
     reader = csv.DictReader(csvfile) 

     for row in reader: 
      """ 
      Send the row to Kafka 
      """ 
    return 

get_files = PythonOperator(
    task_id='get_files', 
    python_callable=get_files, 
    dag=dag 
) 
save_files = PythonOperator(
    task_id='save_files', 
    python_callable=save_files, 
    dag=dag 
) 
csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

私はすべての呼び出し可能な一つの大きなpythonでこれを行うことが知っている、それは内なるように、私は今、コードをリファクタリングしています方法です呼び出し可能です。したがって、SFTPサーバーをポーリングし、最新のファイルを取得し、私のルールに従って単一のPython関数で解析します。私は、XComを使用することは理想的ではないと聞いていますが、Airflowタスクはあまりにも多くの、おそらく互いに通信することは想定されていません。

あなたのユースケースに応じて、Apache Nifiのようなものを探索したいと思うかもしれませんが、私は実際にそれを今も見ています。

関連する問題