2017-07-14 6 views
2

GCSからBigQueryにCSVファイルを読み込む方法を理解しようとしています。下記のパイプライン:CSVファイルをGCSからBigQueryにインポートする

// Create the pipeline 
    Pipeline p = Pipeline.create(options); 

    // Create the PCollection from csv 
    PCollection<String> lines = p.apply(TextIO.read().from("gs://impression_tst_data/incoming_data.csv")); 


    // Transform into TableRow 
    PCollection<TableRow> row = lines.apply(ParDo.of(new StringToRowConverter())); 


    // Write table to BigQuery 
    row.apply(BigQueryIO.<TableRow>writeTableRows() 
      .to(“project_id:dataset.table”) 
      .withSchema(getSchema()) 
      .withWriteDisposition(WriteDisposition.WRITE_APPEND) 
      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)); 

ここでは、私がのTableRow PCollectionを作成するために、パルドで使用しているStringToRowConverterクラスです:

これはしこりというJSONのTableRowsを作成するように見えるステージングファイルを見てみると
// StringToRowConverter 
static class StringToRowConverter extends DoFn<String, TableRow> { 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
     c.output(new TableRow().set("string_field", c.element())); 
    } 
} 

csvを "string_field"という名前の単一の列に変換します。スキーマでstring_fieldを定義しないと、ジョブは失敗します。 string_fieldを定義すると、CSVの各行が列に書き込まれ、スキーマに定義されている他のすべての列は空のままになります。私はこれが期待される行動だと知っています。

私の質問:このJSON出力をどのようにしてスキーマに書き込むのですか?サンプル出力と、以下のスキーマ...

"string_field": "6/26/17 21:28,Dave Smith,1 Learning Drive,867-5309,etc"} 

スキーマ:

static TableSchema getSchema() { 
      return new TableSchema().setFields(new ArrayList<TableFieldSchema>() { 
       // Compose the list of TableFieldSchema from tableSchema. 
       { 
        add(new TableFieldSchema().setName("Event_Time").setType("TIMESTAMP")); 
        add(new TableFieldSchema().setName("Name").setType("STRING")); 
        add(new TableFieldSchema().setName("Address").setType("STRING")); 
        add(new TableFieldSchema().setName("Phone").setType("STRING")); 
        add(new TableFieldSchema().setName("etc").setType("STRING")); 
       } 
      }); 
     } 

StringToRowConverterを使用するよりも、これを行うための良い方法はありますか?

TableRow PCollectionを作成してBQに書き出す前に、ParDoを使用してそれを作成する必要があります。しかし、私はCSV PCollectionを取り込み、TableRowに変換して書き出す方法の実例を見つけることができません。

はい、私はここで学びたいと思っています。私は誰かがスニペットで私を助けたり、これを達成する最も簡単な方法で私に正しい方向を向けることを望んでいます。前もって感謝します。

答えて

1

StringToRowConverterDoFnのコードは、文字列を解析し、複数のフィールドを持つTableRowを生成する必要があります。各行はコンマで区切られているため、文字列をコンマで区切り、列の順序に関する知識を使って次のような操作を行います。

String inputLine = c.element(); 

// May need to make the line parsing more robust, depending on your 
// files. Look at how to parse rows of a CSV using Java. 
String[] split = inputLine.split(','); 

// Also, you may need to handle errors such as not enough columns, etc. 

TableRow output = new TableRow(); 
output.set("Event_Time", split[0]); // may want to parse the string 
output.set("Name", split[1]); 
... 
c.output(output); 
+0

CSVファイルに最初の手動でヘッダーをスキップする必要があります.TextIOによって生成された行のPCollectionは順序付けられていないので、どの行が「最初の行」であったかを知る方法がないため、何らかの理由でそれをフィルタリングする必要があります。 https://issues.apache.org/jira/browse/BEAM-123、 – jkff

+0

も参照してください。完璧、助けてくれてありがとう! –