0
App-engine
で実行中のgoogle-cloud-dataflow
プロセスがあります。 pubsub
で送信されたメッセージをリッスンし、ストリームをbig-query
にストリームします。どのようにグーグルテーブルをクリアせずにアプリケーションエンジンで実行しているgoogle-cloud-dataflowを更新する
コードを更新しましたが、アプリを再実行しようとしています。
しかし、私はこのエラーが表示されます。
Exception in thread "main" java.lang.IllegalArgumentException: BigQuery table is not empty
は、テーブルを削除せずにデータフローを更新するために、とにかくはありますか? 私のコードはかなり頻繁に変わるかもしれません。私はテーブルのデータを削除したくありません。あなたが上withWriteDisposition
を指定する必要が
public class MyPipline {
private static final Logger LOG = LoggerFactory.getLogger(BotPipline.class);
private static String name;
public static void main(String[] args) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("a").setType("string"));
fields.add(new TableFieldSchema().setName("b").setType("string"));
fields.add(new TableFieldSchema().setName("c").setType("string"));
TableSchema tableSchema = new TableSchema().setFields(fields);
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("my-data-analysis");
options.setStagingLocation("gs://my-bucket/dataflow-jars");
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline
.apply(PubsubIO.Read.subscription(
"projects/my-data-analysis/subscriptions/myDataflowSub"));
input.apply(ParDo.of(new DoFn<String, Void>() {
@Override
public void processElement(DoFn<String, Void>.ProcessContext c) throws Exception {
LOG.info("json" + c.element());
}
}));
String fileName = UUID.randomUUID().toString().replaceAll("-", "");
input.apply(ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
JSONObject firstJSONObject = new JSONObject(c.element());
firstJSONObject.put("a", firstJSONObject.get("a").toString()+ "1000");
c.output(firstJSONObject.toString());
}
}).named("update json")).apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
JSONObject json = new JSONObject(c.element());
TableRow row = new TableRow().set("a", json.get("a")).set("b", json.get("b")).set("c", json.get("c"));
c.output(row);
}
}).named("convert json to table row"))
.apply(BigQueryIO.Write.to("my-data-analysis:mydataset.mytable").withSchema(tableSchema)
);
pipeline.run();
}
}