私はStorageにCSVファイルを持っており、それを読み込んでBigQuery Tableに書きたいと思います。これは私のコードであるGCP Dataflow-ストレージからCSVファイルを読み込み、BigQueryに書き込む
GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114
::ジョブが実行を開始したとき、私はそこにある参照 1):
public class StorgeBq {
public static class StringToRowConverter extends DoFn<String, TableRow> {
private String[] columnNames;
private boolean isFirstRow = true;
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
String[] parts = c.element().split(",");
if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setZone("europe-west1-c");
options.setProject("mydata-dev");
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
.apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
.apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
.to("mydata-dev:DF_TEST.dataflow_table")
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
p.run().waitUntilFinish();
}
}
いくつかの問題があります。最初の行はヘッダであるところ、これは私のCSVファイルであります私のコードで定義していない "DropInputs"と呼ばれるプロセスすべてのタスクの前に実行を開始する理由は何ですか?
2)piplineが最初のタスク "ReadLines"で開始しないのはなぜですか? 3)ログファイルでは、 "WriteToBq"タスクでデータの1つをフィールドとして見つけようとしていることがわかります。たとえば、 "1st Grade Teachers"はフィールドではなく "GroupName"のデータです。
"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",
ジョブIDを持っていますか?私はDropInputsのことがここに現れるはずだとは思わない。 – jkff