2016-11-03 6 views
0

App-engineで実行中のgoogle-cloud-dataflowプロセスがあります。 pubsubで送信されたメッセージをリッスンし、ストリームをbig-queryにストリームします。どのようにグーグルテーブルをクリアせずにアプリケーションエンジンで実行しているgoogle-cloud-dataflowを更新する

コードを更新しましたが、アプリを再実行しようとしています。

しかし、私はこのエラーが表示されます。

Exception in thread "main" java.lang.IllegalArgumentException: BigQuery table is not empty 

は、テーブルを削除せずにデータフローを更新するために、とにかくはありますか? 私のコードはかなり頻繁に変わるかもしれません。私はテーブルのデータを削除したくありません。あなたが上withWriteDispositionを指定する必要が

public class MyPipline { 
    private static final Logger LOG = LoggerFactory.getLogger(BotPipline.class); 
    private static String name; 

    public static void main(String[] args) { 

     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("a").setType("string")); 
     fields.add(new TableFieldSchema().setName("b").setType("string")); 
     fields.add(new TableFieldSchema().setName("c").setType("string")); 
     TableSchema tableSchema = new TableSchema().setFields(fields); 

     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
     options.setRunner(BlockingDataflowPipelineRunner.class); 
     options.setProject("my-data-analysis"); 
     options.setStagingLocation("gs://my-bucket/dataflow-jars"); 
     options.setStreaming(true); 

     Pipeline pipeline = Pipeline.create(options); 

     PCollection<String> input = pipeline 
       .apply(PubsubIO.Read.subscription(
         "projects/my-data-analysis/subscriptions/myDataflowSub")); 

     input.apply(ParDo.of(new DoFn<String, Void>() { 

      @Override 
      public void processElement(DoFn<String, Void>.ProcessContext c) throws Exception { 
       LOG.info("json" + c.element()); 
      } 

     })); 
     String fileName = UUID.randomUUID().toString().replaceAll("-", ""); 


     input.apply(ParDo.of(new DoFn<String, String>() { 
      @Override 
      public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { 
       JSONObject firstJSONObject = new JSONObject(c.element()); 
       firstJSONObject.put("a", firstJSONObject.get("a").toString()+ "1000"); 
       c.output(firstJSONObject.toString()); 

      } 

     }).named("update json")).apply(ParDo.of(new DoFn<String, TableRow>() { 

      @Override 
      public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception { 
       JSONObject json = new JSONObject(c.element()); 
       TableRow row = new TableRow().set("a", json.get("a")).set("b", json.get("b")).set("c", json.get("c")); 
       c.output(row); 
      } 

     }).named("convert json to table row")) 
       .apply(BigQueryIO.Write.to("my-data-analysis:mydataset.mytable").withSchema(tableSchema) 
     ); 

     pipeline.run(); 
    } 
} 

答えて

2

あなたBigQueryIO.Write - ドキュメントof the methodof its argumentを参照してください。

は、ここに私のコードです。要件に応じて、WRITE_TRUNCATEまたはWRITE_APPENDが必要です。

関連する問題