2017-07-25 6 views
0

ファイルを読み込み、ファイルのフィールドにある日付の値に基づいて、BigQueryパーティションテーブルに書き込む必要があります。例えばファイルに7月25日と26日の2つの日付が含まれている場合、DataFlowはファイルに存在するデータに基づいてそのデータを2つのパーティションに書き込む必要があります。私は、エラーの下に取得していますプログラム上で動作している間DataFlowジョブを使用したパーティションテーブルのロード

public class StarterPipeline { 
    private static final Logger LOG = 
     LoggerFactory.getLogger(StarterPipeline.class); 

    public static void main(String[] args) { 
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
    options.setProject(""); 
    options.setTempLocation("gs://stage_location/"); 
    Pipeline p = Pipeline.create(options); 

    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("id").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("name").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("designation").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("joindate").setType("STRING")); 
    TableSchema schema = new TableSchema().setFields(fields); 

    PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt")); 

    PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>(){ 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
     String[] data = c.element().split(","); 

     c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3])); 
     } 
    })); 


    rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() { 
     public String getDate(String value) { 
     return "project:dataset.DataFlow_Test$"+value; 
     } 

     @Override 
     public TableDestination apply(ValueInSingleWindow<TableRow> value) { 
     TableRow row = value.getValue(); 
     String date = getDate(row.get("joindate").toString()); 
     String tableSpec = date; 
     String tableDescription = ""; 
     return new TableDestination(tableSpec, tableDescription); 
     } 
    }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() { 
     @Override 
     public TableRow apply(TableRow input) { 
     // TODO Auto-generated method stub 
     return input; 
     } 
    }).withSchema(schema) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

    p.run(); 
    } 
} 

:スレッドの例外「メイン」org.apache.beam.sdk.Pipeline $ PipelineExecutionException:java.lang.IllegalArgumentExceptionが:表参照は[PROJECT_IDではありません]:[dataset_id]。[table_id]形式:原因:java.lang.IllegalArgumentException:テーブル参照が[project_id]:[dataset_id]。[table_id]フォーマットにありません。

+2

ようこそ!実際のコメントにのみコメントを使用してください。追加の情報を追加するには、元の質問を[編集]する必要があります。 – DaveP

答えて

1

ビームは現在、日付パーティションテーブルをサポートしていません。この機能を追跡する問題については、BEAM-2390を参照してください。

0

は、私は、コードの下に使用してデータ内に存在する日付に基づいてパーティションテーブルにデータをロードすることができる午前:スタックオーバーフローへ

 rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() { 
     @Override 
     public TableDestination apply(ValueInSingleWindow<TableRow> value) { 
      TableRow row = value.getValue(); 
      TableReference reference = new TableReference(); 
      reference.setProjectId(""); 
      reference.setDatasetId(""); 

      reference.setTableId("tabelname$" + row.get("datefield").toString()); 
      return new TableDestination(reference, null); 
     } 
     }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() { 
     @Override 
     public TableRow apply(TableRow input) { 
      LOG.info("format function:"+input.toString()); 
      return input; 
     } 
     }).withSchema(schema) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 
関連する問題