以前は、PCollectionのformattedResultsがありました。データフローからBigQueryにデータを挿入
// OPTION 1
PCollection<TableRow> formattedResults = ....
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName)
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
そして、すべての行が直接ここまですべてうまく、BigQueryの中に挿入したと私は大きなクエリで行を挿入するためのコードの下に使用していました。しかし、今、私は動的ので、テーブル名とその行を識別するために始めている、以下のようPCollectionを作成しています:(文字列値として、テーブルの名前と、その行になります)
PCollection<KV<String, TableRow>> tableRowMap // OPTION 2
また、私は行のグループを作成していますこれと同じテーブルに移動します:
キー(文字列)BQテーブル名であり、値はBQに挿入される行のリストであるPCollection<KV<String, Iterable<TableRow>>> groupedRows //OPTION 3
。
オプション1では、上のコードを使用してBQに行を簡単に挿入できましたが、この場合はOPTION 2またはOPTION 3で同じコードを使用することはできません。 OPTION 2またはOPTION 3を使用してテーブルに行を挿入する方法はありますか。リンクやコードサンプルは大きな助けになります。