2017-08-16 6 views
0

GCSバケットに入っているファイルを処理し、一部のBigQueryテーブルにデータをダンプするApache Beamプログラムがあります。ファイルに応じて、切り捨てまたは追加操作を設定したいと思います。これは動的か設定可能にすることができますか? ありがとうございます。Apache BeamでWRITE_TRUNCATEとWRITE_APPENDを動的にします

答えて

0

は、私はあなたが「ファイルに応じて、」を言うとき、あなたはファイルに関するいくつかの情報を持っていることを前提とし、あなたのパイプラインで(ときWRITE_TRUNCATEWRITE_APPENDを使用するかを認識するために)。行うには

最も簡単な方法は、(フィルタリングによって)2 PCollectionsにあなたがBigQueryのに渡している入力を分割し、適切なBigQueryのシンク(WRITE_TRUNCATEWRITE_APPENDと1と1)にそれらのそれぞれを渡すことになります。

は、JavaやPython、擬似コード使用する場合は言及しなかった以下のPythonのためであるが、それは簡単にJavaのSDKにご返信用

files = (pipeline 
     | 'Read files' >> beam.io.Read(Your_GCS_Source()) 
     ) 
files_to_truncate = (files 
     | beam.Filter(lambda file: filter_for_files_to_truncate()) 
     | beam.io.Write(beam.io.BigQuerySink(output_table, schema=output_schema, create_disposition=create_disposition, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 
    ) 

files_to_append = (files 
     | beam.Filter(lambda file: filter_for_files_to_append()) 
     | beam.io.Write(beam.io.BigQuerySink(output_table, schema=output_schema, create_disposition=create_disposition, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) 
    ) 
+0

感謝を移植することができますが、私は書く必要があります大規模なクエリテーブル内のファイル全体のデータ...だから基本的に私は入力として単一のファイルを取得し、大きなクエリにそのデータをダンプする汎用的なプログラムを持っている...と私はそれですべての操作が何を実行するのかを知ることができます...したがって、私は自分の設定が何を言っているかに応じて、ファイル全体に追加または切り捨てを適用する必要があります... PS私は言語としてJavaを使用しています... – rish0097

+0

この設定テーブルをサイド入力(追加のpcollection)としてビームパイプラインに入れて、ファイルをフィルタリング(分割)することができます –

+0

分割したくありませんファイル。 WriteDispositionを作成しているときにサイド入力を渡すことはできないと思うので、上記の例を挙げてください。 – rish0097

関連する問題