私は最初の行としてヘッダーを含むcsvファイルを持っています。私はそれを読んで、BigQueryのカラム要件に合うようにこれらのヘッダを整理しています。しかし、パイプラインが始まるの前にスキーマへの参照が必要です。 BigQueryIO.Writeがこのようにヘッダーに応答できるようにするためのベストプラクティスは何ですか?現在、私のコードは次のようなものになります。私は現在、次のように(おおよそ)見て、おそらく2つのパイプラインを実装しようとしているBigQueryIO.Writeの前にCSVヘッダーを読み込んで変換するにはどうすればよいですか?
//create table
Table table = new Table();
// Where logically should the following line go?
TableSchema customSchema = ?
table.setSchema(customSchema);
TableReference tableRef = new TableReference();
tableRef.setDatasetId("foo_dataset");
tableRef.setProjectId("bar_project");
tableRef.setTableId("baz_table");
table.setTableReference(tableRef);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
// Detect if it's header row
.apply(ParDo.of(new ExtractHeader()))
.apply(ParDo.of(new ToTableRow())
.apply(BigQueryIO.Write.named("Write")
.to(tableRef)
// Where logically should the following line go?
.withSchema(customSchema));
p.run();
を、しかし、実行順序は、データフローに私が取得していますエラーがどこBQ信頼できませんテーブルは存在しません。
PCollection readIn = p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
.apply(ParDo.of(new ExtractHeader()));
TableSchema customSchema = /* generate schema based on what I now know the headers are */
readIn.apply(ParDo.of(new ToTableRow())
.apply(BigQueryIO.Write.named("Write")
.to(tableRef)
// Where logically should the following line go?
.withSchema(customSchema));
p.run();
こんにちは@Mitch、あなたがExtractHeader()で何をするのか尋ねることはできますか?私は同じことをやろうとしていますが、ヘッダー情報を取得して次の変換(あなたの場合はToTableRow)で使用する方法を理解できません。 –
@NorioAkagi ExtractHeaderは、特定の条件に一致するように各行を探し、その行だけを返します。残念ながら、これは動作しません。私は今このアプローチを断念しなければなりませんでしたが、Apache Beam 2.xで新しい 'DynamicDestinations'を使って同様の結果を得ることができます。お役に立てれば。 https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.html –
「DynamicDestinations」は、動的に変更する場合に便利です。行に応じてBQテーブルが、私の理解のヘッダーではありません。私の場合、サードパーティ製のPaaSからデータをインポートする必要があり、csv形式はコントロールできません。また、1つのcsvのすべての行に特定のフィールドが含まれていないと、csvファイル全体からフィールドが削除されるため、ヘッダーを読み取り、各CSVごとのデータ構造を理解する。 –