ファイルを読み込み、ファイルのフィールドにある日付の値に基づいて、BigQueryパーティションテーブルに書き込む必要があります。例えばファイルに7月25日と26日の2つの日付が含まれている場合、DataFlowはファイルに存在するデータに基づいてそのデータを2つのパーティションに書き込む必要があります。私は、エラーの下に取得していますプログラム上で動作している間DataFlowジョブを使用したパーティションテーブルのロード
public class StarterPipeline {
private static final Logger LOG =
LoggerFactory.getLogger(StarterPipeline.class);
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("");
options.setTempLocation("gs://stage_location/");
Pipeline p = Pipeline.create(options);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("STRING"));
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("designation").setType("STRING"));
fields.add(new TableFieldSchema().setName("joindate").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt"));
PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) {
String[] data = c.element().split(",");
c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3]));
}
}));
rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
public String getDate(String value) {
return "project:dataset.DataFlow_Test$"+value;
}
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow row = value.getValue();
String date = getDate(row.get("joindate").toString());
String tableSpec = date;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
@Override
public TableRow apply(TableRow input) {
// TODO Auto-generated method stub
return input;
}
}).withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
}
}
:スレッドの例外「メイン」org.apache.beam.sdk.Pipeline $ PipelineExecutionException:java.lang.IllegalArgumentExceptionが:表参照は[PROJECT_IDではありません]:[dataset_id]。[table_id]形式:原因:java.lang.IllegalArgumentException:テーブル参照が[project_id]:[dataset_id]。[table_id]フォーマットにありません。
ようこそ!実際のコメントにのみコメントを使用してください。追加の情報を追加するには、元の質問を[編集]する必要があります。 – DaveP