GCSからBigQueryにCSVファイルを読み込む方法を理解しようとしています。下記のパイプライン:CSVファイルをGCSからBigQueryにインポートする
// Create the pipeline
Pipeline p = Pipeline.create(options);
// Create the PCollection from csv
PCollection<String> lines = p.apply(TextIO.read().from("gs://impression_tst_data/incoming_data.csv"));
// Transform into TableRow
PCollection<TableRow> row = lines.apply(ParDo.of(new StringToRowConverter()));
// Write table to BigQuery
row.apply(BigQueryIO.<TableRow>writeTableRows()
.to(“project_id:dataset.table”)
.withSchema(getSchema())
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
ここでは、私がのTableRow PCollectionを作成するために、パルドで使用しているStringToRowConverterクラスです:
これはしこりというJSONのTableRowsを作成するように見えるステージングファイルを見てみると// StringToRowConverter
static class StringToRowConverter extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
}
csvを "string_field"という名前の単一の列に変換します。スキーマでstring_fieldを定義しないと、ジョブは失敗します。 string_fieldを定義すると、CSVの各行が列に書き込まれ、スキーマに定義されている他のすべての列は空のままになります。私はこれが期待される行動だと知っています。
私の質問:このJSON出力をどのようにしてスキーマに書き込むのですか?サンプル出力と、以下のスキーマ...
"string_field": "6/26/17 21:28,Dave Smith,1 Learning Drive,867-5309,etc"}
スキーマ:
static TableSchema getSchema() {
return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
// Compose the list of TableFieldSchema from tableSchema.
{
add(new TableFieldSchema().setName("Event_Time").setType("TIMESTAMP"));
add(new TableFieldSchema().setName("Name").setType("STRING"));
add(new TableFieldSchema().setName("Address").setType("STRING"));
add(new TableFieldSchema().setName("Phone").setType("STRING"));
add(new TableFieldSchema().setName("etc").setType("STRING"));
}
});
}
StringToRowConverterを使用するよりも、これを行うための良い方法はありますか?
TableRow PCollectionを作成してBQに書き出す前に、ParDoを使用してそれを作成する必要があります。しかし、私はCSV PCollectionを取り込み、TableRowに変換して書き出す方法の実例を見つけることができません。
はい、私はここで学びたいと思っています。私は誰かがスニペットで私を助けたり、これを達成する最も簡単な方法で私に正しい方向を向けることを望んでいます。前もって感謝します。
CSVファイルに最初の手動でヘッダーをスキップする必要があります.TextIOによって生成された行のPCollectionは順序付けられていないので、どの行が「最初の行」であったかを知る方法がないため、何らかの理由でそれをフィルタリングする必要があります。 https://issues.apache.org/jira/browse/BEAM-123、 – jkff
も参照してください。完璧、助けてくれてありがとう! –