0

私はStorageにCSVファイルを持っており、それを読み込んでBigQuery Tableに書きたいと思います。これは私のコードであるGCP Dataflow-ストレージからCSVファイルを読み込み、BigQueryに書き込む

GroupName,Groupcode,GroupOwner,GroupCategoryID 
System Administrators,sysadmin,13456,100 
Independence High Teachers,HS Teachers,,101 
John Glenn Middle Teachers,MS Teachers,13458,102 
Liberty Elementary Teachers,Elem Teachers,13559,103 
1st Grade Teachers,1stgrade,,104 
2nd Grade Teachers,2nsgrade,13561,105 
3rd Grade Teachers,3rdgrade,13562,106 
Guidance Department,guidance,,107 
Independence Math Teachers,HS Math,13660,108 
Independence English Teachers,HS English,13661,109 
John Glenn 8th Grade Teachers,8thgrade,,110 
John Glenn 7th Grade Teachers,7thgrade,13452,111 
Elementary Parents,Elem Parents,,112 
Middle School Parents,MS Parents,18001,113 
High School Parents,HS Parents,18002,114 

::ジョブが実行を開始したとき、私はそこにある参照 1):

public class StorgeBq { 

     public static class StringToRowConverter extends DoFn<String, TableRow> { 

      private String[] columnNames; 

      private boolean isFirstRow = true; 

      @ProcessElement 
      public void processElement(ProcessContext c) { 
       TableRow row = new TableRow(); 

       String[] parts = c.element().split(","); 

       if (isFirstRow) { 
        columnNames = Arrays.copyOf(parts, parts.length); 
        isFirstRow = false; 
       } else { 
        for (int i = 0; i < parts.length; i++) { 
         row.set(columnNames[i], parts[i]); 
        } 
        c.output(row); 
       } 
      } 
     } 

     public static void main(String[] args) { 

      DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
         .as(DataflowPipelineOptions.class); 
        options.setZone("europe-west1-c"); 
        options.setProject("mydata-dev"); 
        options.setRunner(DataflowRunner.class); 
        Pipeline p = Pipeline.create(options); 

      p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv")) 
      .apply("ConverToBqRow",ParDo.of(new StringToRowConverter())) 
      .apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows() 
        .to("mydata-dev:DF_TEST.dataflow_table") 
        .withWriteDisposition(WriteDisposition.WRITE_APPEND) 
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)); 
      p.run().waitUntilFinish(); 
     } 

} 

いくつかの問題があります。最初の行はヘッダであるところ、これは私のCSVファイルであります私のコードで定義していない "DropInputs"と呼ばれるプロセスすべてのタスクの前に実行を開始する理由は何ですか? enter image description here

2)piplineが最初のタスク "ReadLines"で開始しないのはなぜですか? 3)ログファイルでは、 "WriteToBq"タスクでデータの1つをフィールドとして見つけようとしていることがわかります。たとえば、 "1st Grade Teachers"はフィールドではなく "GroupName"のデータです。

"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.", 
+0

ジョブIDを持っていますか?私はDropInputsのことがここに現れるはずだとは思わない。 – jkff

答えて

1

コードにいくつか問題があります。しかし、まず、 "DropInputs"ステージに関して、あなたはそれを無視することができます。それはthisバグレポートの結果でした。私はそれがなぜ表示される必要があるのか​​まだ分かりません(それは多くのユーザーも混乱しています)。私はGoogle社員がそのことを訴えたいと思っています。私の意見では、それはちょうど混乱です。

右は、今、あなたのコードに:

  1. あなたは、最初の行の読み取りがあなたのヘッダーになることを想定しています。これは間違った前提です。データフローは並列に読み込まれるため、ヘッダー行はいつでも到着する可能性があります。 booleanフラグを使用して確認する代わりに、ParDoで毎回stringの値自体を確認してください。 if (c.element.contains("GroupName") then..
  2. BigQueryテーブルスキーマがありません。 BigQueryシンクにwithSchema(..)を追加する必要があります。私のパブリックパイプラインの一つであるexampleがあります。
+0

ありがとうございます。既存のテーブルにBigQueryを書きたい場合はどうすればいいですか?「withSchema(..)」を追加せずに書き込む必要がありますか?これはcsvの例ですが、私は約500のcollumnを持っていて、常にスキーマを追加するのは簡単ではないいくつかのCSVファイルに対してこの作業を行うべきです。ちなみに 、CSVを読み、BigQueryに書き込む例がありますか? – Majico

+0

私はCSVファイルをBQに書き込むことに成功しましたが、唯一の問題は "withSchema"を使用せずにBQに書き込むことです。コードを書き込む代わりにBQから取得する簡単な方法です。 – Majico

+1

別の質問をしてください。 – jkff

関連する問題