2017-07-15 33 views
1

データフロー/ビームパイプラインを2.0.0-beta3から2.0.0に移行しようとしています。Dataflow/BeamのSDK 2.0.0でNPEを投げている空のサイド出力

ただし、2.0.0バージョンを使用すると、パイプラインはデータフロー/ビームAPIのNPEの深さで失敗します。 2.0.0-beta3に戻って、もう一度正常に動作します。

コードに加えられた変更は、2.0.0 SDKのAPI変更を組み込むことだけです。私たちは何も変えていない。この問題は、サイドアウトプットが空の場合に発生します。空のサイド出力は2.0.0-beta3で正常に動作します。

2.0.0への移行で何か問題がありますか?

これは、問題を再現する例です。

--project=<project-id> 
--runner=DirectRunner 
--tempLocation=gs://<your-bucket> 
--stagingLocation=gs://<your-bucket> 

2.0.0-ベータ3(正常に動作)

public class EmptySideOutputNPE implements Serializable { 
    private static final TupleTag<TableRow> mainOutputTag = new TupleTag<TableRow>("mainOutputTag") { 
    }; 
    private static final TupleTag<TableRow> sideOutputTag = new TupleTag<TableRow>("sideOutputTag") { 
    }; 
    private static final TupleTag<TableRow> possibleEmptySideOutputTag = new TupleTag<TableRow>("possibleEmptySideOutputTag") { 
    }; 

    public static void main(String[] args) { 
     PipelineOptions options = PipelineOptionsFactory 
       .fromArgs(args) 
       .withValidation() 
       .as(PipelineOptions.class); 
     Pipeline pipeline = Pipeline.create(options); 
     //Read from BigQuery public dataset 
     PCollectionTuple results = pipeline.apply("Read-BQ", BigQueryIO.Read.from("bigquery-samples:wikipedia_benchmark.Wiki1k")) 
       .apply(ParDo.of(new DoFn<TableRow, TableRow>() { 
        @ProcessElement 
        public void processElement(ProcessContext c) throws Exception { 
         TableRow inputRow = c.element(); 
         //output the title to main output tag 
         TableRow titleRow = new TableRow(); 
         titleRow.set("col", inputRow.get("title")); 
         c.output(titleRow); 

         //output the language to the side output 
         TableRow languageRow = new TableRow(); 
         languageRow.set("col", inputRow.get("language")); 
         c.sideOutput(sideOutputTag, languageRow); 

         //don' output anything for the possibleEmptySideOutputTag tag 
        } 
       }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag).and(possibleEmptySideOutputTag))); 
     //write the results: 
     results.get(mainOutputTag).apply("Title write", 
       BigQueryIO.Write.to("<project-id>:<dataset>.2_0_0_sdk_test_title") 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     results.get(sideOutputTag).apply("Language write", 
       BigQueryIO.Write.to("<project-id>:<dataset>.2_0_0_sdk_test_language") 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     results.get(possibleEmptySideOutputTag).apply("Empty write", 
       BigQueryIO.Write.to("<project-id>:<dataset>.2_0_0_sdk_test_empty") 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     pipeline.run(); 
    } 

    private static TableSchema getTableSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("col").setType("STRING")); 
     return new TableSchema().setFields(fields); 
    } 
} 

2.0.0(NPE)

public class EmptySideOutputNPE implements Serializable { 
    private static final TupleTag<TableRow> mainOutputTag = new TupleTag<TableRow>("mainOutputTag") { 
    }; 
    private static final TupleTag<TableRow> sideOutputTag = new TupleTag<TableRow>("sideOutputTag") { 
    }; 
    private static final TupleTag<TableRow> possibleEmptySideOutputTag = new TupleTag<TableRow>("possibleEmptySideOutputTag") { 
    }; 

    public static void main(String[] args) { 
     PipelineOptions options = PipelineOptionsFactory 
       .fromArgs(args) 
       .withValidation() 
       .as(PipelineOptions.class); 
     Pipeline pipeline = Pipeline.create(options); 
     //Read from BigQuery public dataset 
     PCollectionTuple results = pipeline.apply("Read-BQ", BigQueryIO.read().from("bigquery-samples:wikipedia_benchmark.Wiki1k")) 
       .apply(ParDo.of(new DoFn<TableRow, TableRow>() { 
        @ProcessElement 
        public void processElement(ProcessContext c) throws Exception { 
         TableRow inputRow = c.element(); 
         //output the title to main output tag 
         TableRow titleRow = new TableRow(); 
         titleRow.set("col", inputRow.get("title")); 
         c.output(titleRow); 

         //output the language to the side output 
         TableRow languageRow = new TableRow(); 
         languageRow.set("col", inputRow.get("language")); 
         c.output(sideOutputTag, languageRow); 

         //don' output anything for the possibleEmptySideOutputTag tag 
        } 
       }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag).and(possibleEmptySideOutputTag))); 
     //write the results: 
     results.get(mainOutputTag).apply("Title write", 
       BigQueryIO.writeTableRows().to("<project-id>:<dataset>.2_0_0_sdk_test_title") 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     results.get(sideOutputTag).apply("Language write", 
       BigQueryIO.writeTableRows().to("<project-id>:<dataset>.2_0_0_sdk_test_language") 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     results.get(possibleEmptySideOutputTag).apply("Empty write", 
       BigQueryIO.writeTableRows().to("<project-id>:<dataset>.2_0_0_sdk_test_empty") 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withSchema(getTableSchema())); 
     pipeline.run(); 
    } 

    private static TableSchema getTableSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("col").setType("STRING")); 
     return new TableSchema().setFields(fields); 
    } 
} 
次の引数で実行します
23:43:09,484 0 [main] INFO org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase - Starting BigQuery extract job: beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract 
    23:43:11,209 1725 [main] INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract, projectId=<redacted>}. 
    bq show -j --format=prettyjson --project_id=<redacted> beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract 
    23:43:12,718 3234 [main] INFO org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase - BigQuery extract job completed: beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract 
    23:43:14,738 5254 [direct-runner-worker] INFO org.apache.beam.sdk.io.FileBasedSource - Matched 1 files for pattern gs://nonsense/BigQueryExtractTemp/885a1329f1a045d6a6422c975690967e/000000000000.avro 
    23:43:18,171 8687 [direct-runner-worker] INFO org.apache.beam.sdk.io.FileBasedSource - Filepattern gs://nonsense/BigQueryExtractTemp/885a1329f1a045d6a6422c975690967e/000000000000.avro matched 1 files with total size 60370 
    23:43:18,653 9169 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/956c7d7b866941aaa406bd9e5cb63aab/399d59ec-2475-4d07-9fa9-25feadf53737. 
    23:43:18,653 9169 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/4377160da6184249a5ffc7cc27155265/8db1d8c4-9e4d-4093-8b9f-3e892de78057. 
    23:43:22,839 13355 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/956c7d7b866941aaa406bd9e5cb63aab/1b544d4b-650c-4e05-abc0-f80318278a2f. 
    23:43:22,849 13365 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/4377160da6184249a5ffc7cc27155265/2f3164e0-674e-4926-925f-678657587e75. 
    23:43:27,428 17944 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/4377160da6184249a5ffc7cc27155265/b0d8ae7a-e6b0-48ac-a0a1-fd3e0fa17f75. 
    23:43:27,434 17950 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/956c7d7b866941aaa406bd9e5cb63aab/b77b17e3-562c-47b0-8a6c-ee8eb7745fc8. 
    23:43:33,242 23758 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/1f559dd752eb43f7bd1af1c881c21235/a8e51a20-408d-4628-abf3-bbdb2ebd9527. 
    23:43:35,046 25562 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=956c7d7b866941aaa406bd9e5cb63aab_e9f0a5890698d99399a6106c26d65de2_00001-0, projectId=<redacted>}. 
    bq show -j --format=prettyjson --project_id=<redacted> 956c7d7b866941aaa406bd9e5cb63aab_e9f0a5890698d99399a6106c26d65de2_00001-0 
    23:43:35,126 25642 [direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=4377160da6184249a5ffc7cc27155265_a6c30233d929e6958a536246c31fe3d1_00001-0, projectId=<redacted>}. 
    bq show -j --format=prettyjson --project_id=<redacted> 4377160da6184249a5ffc7cc27155265_a6c30233d929e6958a536246c31fe3d1_00001-0 
    Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException 
     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322) 
     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292) 
     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) 
     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) 
     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) 
     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) 
     at com.pipelines.EmptySideOutputNPE.main(EmptySideOutputNPE.java:85) 
    Caused by: java.lang.NullPointerException 
     at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97) 

観察

  1. パイプラインからpossibleEmptySideOutputTagを削除するとき、それは2.0.0で正常に動作1+行を追加するとき、すなわち.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
  2. それは2.0.0で正常に動作ParDopossibleEmptySideOutputTagになります。

答えて

1

https://issues.apache.org/jira/browse/BEAM-2406は修正されており、修正はHEADまたは今後の2.1.0リリースで利用できます。

+0

私は元の質問にコメントしたので、これを見つけることができたはずです! 100万のおかげで@ jkff。 2.0.1のETAはありますか? –

+0

2.1.0のリリースプロセスは既に開始されているので、1〜2週間以内に発効する可能性が最も高いと思いますか? – jkff

+0

ブリリアント。ありがとうございました。 –

関連する問題