GCSバケットに入っているファイルを処理し、一部のBigQueryテーブルにデータをダンプするApache Beamプログラムがあります。ファイルに応じて、切り捨てまたは追加操作を設定したいと思います。これは動的か設定可能にすることができますか? ありがとうございます。Apache BeamでWRITE_TRUNCATEとWRITE_APPENDを動的にします
0
A
答えて
0
は、私はあなたが「ファイルに応じて、」を言うとき、あなたはファイルに関するいくつかの情報を持っていることを前提とし、あなたのパイプラインで(ときWRITE_TRUNCATE
とWRITE_APPEND
を使用するかを認識するために)。行うには
最も簡単な方法は、(フィルタリングによって)2 PCollections
にあなたがBigQueryのに渡している入力を分割し、適切なBigQueryのシンク(WRITE_TRUNCATE
とWRITE_APPEND
と1と1)にそれらのそれぞれを渡すことになります。
は、JavaやPython、擬似コード使用する場合は言及しなかった以下のPythonのためであるが、それは簡単にJavaのSDKにご返信用
files = (pipeline
| 'Read files' >> beam.io.Read(Your_GCS_Source())
)
files_to_truncate = (files
| beam.Filter(lambda file: filter_for_files_to_truncate())
| beam.io.Write(beam.io.BigQuerySink(output_table, schema=output_schema, create_disposition=create_disposition, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
files_to_append = (files
| beam.Filter(lambda file: filter_for_files_to_append())
| beam.io.Write(beam.io.BigQuerySink(output_table, schema=output_schema, create_disposition=create_disposition, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
)
関連する問題
- 1. Apache Beamバージョン2.2.0
- 2. Apache Beam/Flink ExceptionInChainedStubException
- 3. Apache BeamのDymanicDestination
- 4. Apache BeamのParDoとFlatMap?
- 5. Apache BeamとApache Nifiの相違点
- 6. Apache Beam「mvn package」はEclipse経由で動作しません
- 7. Apache Beam GroupByKeyは、DirectRunnerを使用すると決して起動しません。
- 8. Apache Beamメモリ内ストレージ
- 9. プロパティファイルのApache Beamオプション
- 10. Apache Beam TextIOワイルドカード(ローカルファイル)
- 11. Apache Beamでファイルをバイト[]として読み取るには?
- 12. Biqqueryにデータを読み込むwrite_truncateを使用してGoogle Dataflow/Beamを使用してパーティションテーブルを作成
- 13. Apache BeamでAzure Blobをサポートしていますか?
- 14. ビッグ・クエリでのWRITE_TRUNCATEの動作
- 15. Apache Beam Java SDKとPubSubソースの例
- 16. pythonでgzipファイルを開くApache Beam
- 17. Apache Beam取得元ファイル名
- 18. Python Apache BeamパイプラインステータスAPIコール
- 19. Apache beam Dataflow SDKエラー(例:
- 20. Apache Beam PubSub Readerの例外
- 21. Python Apache Beam Side入力アサーションエラー
- 22. データフローのカスタムApache Beam Pythonバージョン
- 23. PythonのApache Beam、beam.io.TextFileSourceのエラー
- 24. Apache Beamのタプルベースのウィンドウ
- 25. google-cloud-dataflow vs apache-beam
- 26. Google Cloud Dataflow(Apache Beam) - SideInputsをTextIO.writeで使用できますか?
- 27. PCollectionが空であることを確認する - Apache Beam
- 28. Apache BeamでのSparkRunnerのPythonサポート
- 29. Apache Beamのファイルに書き込む
- 30. Apache BeamのValueProviderの値を抽出します
感謝を移植することができますが、私は書く必要があります大規模なクエリテーブル内のファイル全体のデータ...だから基本的に私は入力として単一のファイルを取得し、大きなクエリにそのデータをダンプする汎用的なプログラムを持っている...と私はそれですべての操作が何を実行するのかを知ることができます...したがって、私は自分の設定が何を言っているかに応じて、ファイル全体に追加または切り捨てを適用する必要があります... PS私は言語としてJavaを使用しています... – rish0097
この設定テーブルをサイド入力(追加のpcollection)としてビームパイプラインに入れて、ファイルをフィルタリング(分割)することができます –
分割したくありませんファイル。 WriteDispositionを作成しているときにサイド入力を渡すことはできないと思うので、上記の例を挙げてください。 – rish0097