2017-04-21 16 views
0

私は最初の行としてヘッダーを含むcsvファイルを持っています。私はそれを読んで、BigQueryのカラム要件に合うようにこれらのヘッダを整理しています。しかし、パイプラインが始まるの前にスキーマへの参照が必要です。 BigQueryIO.Writeがこのようにヘッダーに応答できるようにするためのベストプラクティスは何ですか?現在、私のコードは次のようなものになります。私は現在、次のように(おおよそ)見て、おそらく2つのパイプラインを実装しようとしているBigQueryIO.Writeの前にCSVヘッダーを読み込んで変換するにはどうすればよいですか?

//create table 
Table table = new Table(); 
// Where logically should the following line go? 
TableSchema customSchema = ? 
table.setSchema(customSchema); 
TableReference tableRef = new TableReference(); 
tableRef.setDatasetId("foo_dataset"); 
tableRef.setProjectId("bar_project"); 
tableRef.setTableId("baz_table"); 
table.setTableReference(tableRef); 

Pipeline p = Pipeline.create(options); 

p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv")) 
    // Detect if it's header row 
    .apply(ParDo.of(new ExtractHeader())) 
    .apply(ParDo.of(new ToTableRow()) 
    .apply(BigQueryIO.Write.named("Write") 
    .to(tableRef) 
    // Where logically should the following line go? 
    .withSchema(customSchema)); 
p.run(); 

を、しかし、実行順序は、データフローに私が取得していますエラーがどこBQ信頼できませんテーブルは存在しません。

PCollection readIn = p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv")) 
    .apply(ParDo.of(new ExtractHeader())); 
TableSchema customSchema = /* generate schema based on what I now know the headers are */ 
readIn.apply(ParDo.of(new ToTableRow()) 
    .apply(BigQueryIO.Write.named("Write") 
    .to(tableRef) 
    // Where logically should the following line go? 
    .withSchema(customSchema)); 
p.run(); 
+0

こんにちは@Mitch、あなたがExtractHeader()で何をするのか尋ねることはできますか?私は同じことをやろうとしていますが、ヘッダー情報を取得して次の変換(あなたの場合はToTableRow)で使用する方法を理解できません。 –

+0

@NorioAkagi ExtractHeaderは、特定の条件に一致するように各行を探し、その行だけを返します。残念ながら、これは動作しません。私は今このアプローチを断念しなければなりませんでしたが、Apache Beam 2.xで新しい 'DynamicDestinations'を使って同様の結果を得ることができます。お役に立てれば。 https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.html –

+0

「DynamicDestinations」は、動的に変更する場合に便利です。行に応じてBQテーブルが、私の理解のヘッダーではありません。私の場合、サードパーティ製のPaaSからデータをインポートする必要があり、csv形式はコントロールできません。また、1つのcsvのすべての行に特定のフィールドが含まれていないと、csvファイル全体からフィールドが削除されるため、ヘッダーを読み取り、各CSVごとのデータ構造を理解する。 –

答えて

1

この機能(動的スキーマ)は現在レビュー中ですhttps://github.com/apache/beam/pull/2609(私はそれを検討しています)。進行中のPRを試すことができますが、そのAPIはレビューの結果、多少変更される可能性があることに注意してください。 PRが提出されたら、この回答を更新します。

+0

ありがとう@jkffと私の質問の要点になってくれてありがとう!私は実際に2.0.0を待つ作業が必要ですが、Beamが安定したらすぐに実装します。その間、最初にファイルを読み込み、ヘッダーを取り除き、その情報をパイプラインに渡すのは "データフローの方法"ですか? –

+1

はい、最初にファイルを読み込んでスキーマを理解し、そこに(既に知られている)スキーマを渡す新しいパイプラインを作成する必要があると思います。 – jkff

+0

私はこれが2.0.0の 'DynamicDestinations'を介して実装されていると仮定していますか?もしそうなら、やっぱり! –

関連する問題