2016-08-04 13 views
0

以前は、PCollectionのformattedResultsがありました。データフローからBigQueryにデータを挿入

    // OPTION 1 
PCollection<TableRow> formattedResults = .... 
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName) 
          .withSchema(tableSchema) 
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

そして、すべての行が直接ここまですべてうまく、BigQueryの中に挿入したと私は大きなクエリで行を挿入するためのコードの下に使用していました。しかし、今、私は動的ので、テーブル名とその行を識別するために始めている、以下のようPCollectionを作成しています:(文字列値として、テーブルの名前と、その行になります)

PCollection<KV<String, TableRow>> tableRowMap // OPTION 2 

また、私は行のグループを作成していますこれと同じテーブルに移動します:

キー(文字列)BQテーブル名であり、値はBQに挿入される行のリストである
PCollection<KV<String, Iterable<TableRow>>> groupedRows //OPTION 3 

オプション1では、上のコードを使用してBQに行を簡単に挿入できましたが、この場合はOPTION 2またはOPTION 3で同じコードを使用することはできません。 OPTION 2またはOPTION 3を使用してテーブルに行を挿入する方法はありますか。リンクやコードサンプルは大きな助けになります。

答えて

1

Dataflowがウィンドウごとにテーブルに書き込む最も近いものです。独自のBoundedWindowサブクラスとWindowFnを作成して、ウィンドウに必要なデータを含めることができます。これを行うには、BigQueryIO.Writeの

to(SerializableFunction<BoundedWindow,String> tableSpecFunction) 

を使用してください。

この機能はBigQueryのストリーミングアップロード機能を使用しています。この機能は、表あたり100MB/sに制限されています。さらに、アップロードはアトミックではないため、失敗したバッチジョブは出力の一部のみをアップロードすることがあります。

-1

BigQueryIO.Writeに頼るのではなく、bigqueryに直接データを挿入する独自のDoFnを作成するオプションもあります。 技術的には、BigQueryTableInserterを作成する必要があります。insertAll(TableReference ref, List<TableRow> rowList)を使用して、目的の表に項目を挿入できます。

あなたのような何か使用してTableReferenceを作成することができます。これはBigQueryIOは、スループットを最大化するために挿入する必要がある行を分割するためにいくつかの素晴らしいものを行い、適切に再試行を処理するようお勧め100%ではありません new TableReference().setProjectId("projectfoo").setDatasetId("datasetfoo").setTableId("tablefoo")

を。

関連する問題