2016-10-21 19 views

答えて

1

私が取ったアプローチは以下の通りです:

に基づいて、ウィンドウを設定

p.apply(PubsubIO.Read 
      .subscription(subscription) 
      .withCoder(TableRowJsonCoder.of()) 
     ) 
     .apply(Window.into(new TablePartitionWindowFn())) 
     .apply(BigQueryIO.Write 
         .to(new DayPartitionFunc(dataset, table)) 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
     ); 

テーブル名にウィンドウを変換し、着信記録

  • ためのウィンドウを設定し

    • 開始値がパーティションの設定に使用されるため、着信データ、エンドインスタントは無視できます。

      public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { 
      
      private IntervalWindow assignWindow(AssignContext context) { 
          TableRow source = (TableRow) context.element(); 
          String dttm_str = (String) source.get("DTTM"); 
      
          DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC(); 
      
          Instant start_point = Instant.parse(dttm_str,formatter); 
          Instant end_point = start_point.withDurationAdded(1000, 1); 
      
          return new IntervalWindow(start_point, end_point); 
      }; 
      

      動的テーブルのパーティションを設定する:

      public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> { 
      
      String destination = ""; 
      
      public DayPartitionFunc(String dataset, String table) { 
          this.destination = dataset + "." + table+ "$"; 
      } 
      
      @Override 
      public String apply(BoundedWindow boundedWindow) { 
          // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows. 
          String dayString = DateTimeFormat.forPattern("yyyyMMdd") 
                  .withZone(DateTimeZone.UTC) 
                  .print(((IntervalWindow) boundedWindow).start()); 
          return destination + dayString; 
      }} 
      

      は、同じ結果を達成するためのより良い方法があるなら、私に教えてください。

  • 関連する問題