2017-08-25 29 views
1

GCSから読み込み中の単純なパイプライン(ParDo)を実行し、その結果をBigQueryに書き出します。最大50台のVMを自動拡張し、GCP上で実行し、何も気にしません。Dataflow/BeamパイプラインからのBigQueryへの書き込みが遅いのはなぜですか?

すべてのデータをGCS(〜10Bレコード&〜700 + GB)から読み込み、変換すると、すべてが比較的早く(最初の7〜10分で)発生します。

しかし、それがBigQueryの書き込み(BigQueryIOを使用)になると、約1Mのレコード(〜60MB)しか書き込まなくても、遅くなります。このステップだけでも約20mもかかります。

グラフはBigQueryへの書き込みが遅いだけでなく、成功したにもかかわらず「停止」していることを示しています(極端に遅くなります)。 BigQueryへの簡単な書き込みだけでは、この手順も複雑すぎます(下の図を参照)。

ボトルネックは、ステップExecuting operation BigQueryIO.Write/BatchLoads/WriteRename(以下のログを参照)に達したときのようです。

私のコードで間違っていることがありますか?

コード:

public class Pipeline { 
    private static final String BIG_QUERY_TABLE = "<redacted>:<redacted>.melbourne_titles"; 
    private static final String BUCKET = "gs://<redacted>/*.gz"; 

    public static void main(String[] args) { 
     DataflowPipelineOptions options = PipelineOptionsFactory 
       .fromArgs(args) 
       .withValidation() 
       .as(DataflowPipelineOptions.class); 
     options.setAutoscalingAlgorithm(THROUGHPUT_BASED); 
     Pipeline pipeline = Pipeline.create(options); 
     pipeline.apply(TextIO.read().from(BUCKET).withCompressionType(GZIP)) 
       .apply(ParDo.of(new DoFn<String, TableRow>() { 
        @ProcessElement 
        public void processElement(ProcessContext c) throws Exception { 
         String input = c.element(); 
         String title = input.split(",")[5]; 
         if (title.toLowerCase().contains("melbourne")) { 
          TableRow tableRow = new TableRow(); 
          tableRow.set("title", title); 
          c.output(tableRow); 
         } 
        } 
       })) 
       .apply(BigQueryIO.writeTableRows() 
         .to(BIG_QUERY_TABLE) 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(WRITE_TRUNCATE) 
         .withSchema(getSchema())); 
     pipeline.run(); 
    } 

    private static TableSchema getSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("title").setType("STRING")); 
     TableSchema schema = new TableSchema().setFields(fields); 
     return schema; 
    } 
} 

ログの抜粋:

2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas... 
    2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym... 
    2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Create 
    2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH... 
    2017-08-25 (21:30:23) Starting 50 workers in australia-southeast1-a... 
    2017-08-25 (21:30:23) Executing operation TextIO.Read/Read+ParDo(Anonymous)+BigQueryIO.Write/PrepareWrite/ParDo(Anonymous)... 
    2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TriggerIdCreation/Read(CreateSource)+BigQueryIO.Writ... 
    2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoad... 
    2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas... 
    2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas... 
    2017-08-25 (21:31:22) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH... 
    2017-08-25 (21:31:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH... 
    2017-08-25 (21:38:10) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/CreateDataflowView 
    2017-08-25 (21:38:13) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/CreateDataflowView 
    2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym... 
    2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Close 
    2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri... 
    2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Read+BigQueryIO.Write/BatchLoads/GroupByK... 
    2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym... 
    2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri... 
    2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Create 
    2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Create 
    2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri... 
    2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Close 
    2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Close 
    2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Read+BigQueryIO... 
    2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Read+BigQueryIO... 
    2017-08-25 (21:39:00) Executing operation s35-u80 
    2017-08-25 (21:39:01) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/Flatten.PCollections 
    2017-08-25 (21:39:03) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/CreateDataflowView 
    2017-08-25 (21:39:06) Executing operation BigQueryIO.Write/BatchLoads/ResultsView/CreateDataflowView 
    2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Create 
    2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Create 
    2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/Create.Values/Read(CreateSource)+BigQueryIO.Write/Ba... 
    2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Close 
    2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Close 
    2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read+BigQueryIO... 
    2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA... 
    2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Read+BigQueryIO.... 
    2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA... 
    2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA... 
    2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/CreateDataflowView 
    2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/WriteRename 
    2017-08-25 (21:57:43) Stopping worker pool... 

過度に複雑探してステップ:

enter image description here

ジョブの詳細:

  • データフローのJava SDK:2.0.0
  • 求人ID:2017-08-25_04_29_54-7210937293145071720

UPDATE

私はこの問題を考えますDataflowが生成しているファイルの数であり、その後、そのBigQueryをロードする必要があります。それだけで1M行かもしれないが、データフローは、負荷へのファイルの850+生産している:

"configuration" : { 
    "load" : { 
     "createDisposition" : "CREATE_IF_NEEDED", 
     "destinationTable" : { 
     "datasetId" : "dataflow_on_a_tram", 
     "projectId" : "grey-sort-challenge", 
     "tableId" : "melbourne_titles" 
     }, 
     "schema" : { 
     "fields" : [ { 
      "name" : "year", 
      "type" : "STRING" 
     }, { 
      "name" : "month", 
      "type" : "STRING" 
     }, { 
      "name" : "day", 
      "type" : "STRING" 
     }, { 
      "name" : "wikimedia_project", 
      "type" : "STRING" 
     }, { 
      "name" : "language", 
      "type" : "STRING" 
     }, { 
      "name" : "title", 
      "type" : "STRING" 
     }, { 
      "name" : "views", 
      "type" : "INTEGER" 
     } ] 
     }, 
     "sourceFormat" : "NEWLINE_DELIMITED_JSON", 
     "sourceUris" : [ 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/51221a43-8fd8-417d-90ca-2f3c3e5789d2", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/5e1c3cb8-20d1-45ef-b0bb-209645c36093", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0ed8d240-2bc2-4c8b-808d-792540448c73", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/d7a1fefe-6dd8-4f30-bf97-040c3692e448", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/b7c4d9a8-d45d-4cc6-b375-291e6435ed53", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/17a7bbf4-5695-4188-b03a-3ef5cda8607c", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/783af461-c114-4a41-aa5f-ed1c7db86bab", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/dad046fc-eabf-4212-83f1-7d7fa71075c1", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/7b9ffec1-7424-4248-83b4-98a4ef4233b9", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/bb297232-8e84-4a14-9dc6-3efde1b2b586", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0693972a-1319-4637-af9f-8a4a3d5cb0f7", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/41b1e722-f76c-404d-a71b-bd36c09e8a06", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/19cfd89e-c9ee-4221-aee1-b3503dbcd93b", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/574467f2-5771-479a-b213-2941225a24bd", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/4d872304-0f42-47f2-89cf-b3a3f856ca67", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/1c086246-8eec-4bbe-be98-b01abb181d33", 
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/9439f5f4-5020-471d-b631-e1a3fea1584f", 

[..]851 files!

+0

ワーカーログを見ると、BQロードジョブが完了するのを待っているようです。 BQの仕事が通常よりも遅かったかもしれません。この問題はいつも普通ですか? – Pablo

+0

私はもう一度それを走らせました。同じ問題。 2017-08-25_14_26_18-5377718284053913263 –

+0

@Pablo - 私の更新を見てください。 DataflowはBQ用に非常に多くのファイルを生成しているため、速度が遅いと思います。 –

答えて

1

心に留めておくべき一つのこと、BigQueryは負荷ジョブの遅延を保証するものではありません。他の多くのロード・ジョブが同時に発行された場合、ジョブはスケジュールされるのを待っているキューで待機する可能性があります。このジョブを再度実行できる場合は、BigQueryの読み込みジョブ自体を調べて、何が起きているのかを確認することができます。

関連する問題