2016-05-11 6 views
2

私はGoogleのデータフローからDATA_LOSS例外の下に取得しています。私は10から15までのJsonファイルを持っています(サイズはファイルあたり約2-3 MBです)。私はjackson2を使用してファイルを解析しています。ParDo()を使用して変換を行い、最後にグループ化することで重複した項目を削除します。間違ったことをしていると助けてくれますか?、Googleのデータフロー、DATA_LOSS例外

それはDirectPipelineRunnerで正常に動作しています。

2016-05-11T13:06:31.277Z: Detail: (eb15ba3070c2acbc): Checking required Cloud APIs are enabled. 
2016-05-11T13:06:31.637Z: Detail: (eb15ba3070c2abc7): Expanding GroupByKey operations into optimizable parts. 
2016-05-11T13:06:31.640Z: Detail: (eb15ba3070c2a6b5): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns 
2016-05-11T13:06:31.646Z: Detail: (eb15ba3070c2a77f): Annotating graph with Autotuner information. 
2016-05-11T13:06:31.732Z: Detail: (eb15ba3070c2a5c0): Fusing adjacent ParDo, Read, Write, and Flatten operations 
2016-05-11T13:06:31.735Z: Detail: (eb15ba3070c2a0ae): Fusing consumer ParDo(ParserEdition) into ReadEditions4GCS 
2016-05-11T13:06:31.737Z: Detail: (eb15ba3070c2ab9c): Fusing consumer ParDo(GetRelatedArticles) into ParDo(FlattenArticles) 
2016-05-11T13:06:31.739Z: Detail: (eb15ba3070c2a68a): Fusing consumer GroupByKey/GroupByWindow into GroupByKey/Read 
2016-05-11T13:06:31.741Z: Detail: (eb15ba3070c2a178): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow 
2016-05-11T13:06:31.743Z: Detail: (eb15ba3070c2ac66): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Read 
2016-05-11T13:06:31.745Z: Detail: (eb15ba3070c2a754): Fusing consumer Write2Gcs/Write2Gcs into Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup 
2016-05-11T13:06:31.747Z: Detail: (eb15ba3070c2a242): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Write into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify 
2016-05-11T13:06:31.750Z: Detail: (eb15ba3070c2ad30): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify into Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey 
2016-05-11T13:06:31.752Z: Detail: (eb15ba3070c2a81e): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey into Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into() 
2016-05-11T13:06:31.754Z: Detail: (eb15ba3070c2a30c): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into() into ParDo(Article2CSV) 
2016-05-11T13:06:31.757Z: Detail: (eb15ba3070c2adfa): Fusing consumer ParDo(Article2CSV) into AnonymousParDo 
2016-05-11T13:06:31.759Z: Detail: (eb15ba3070c2a8e8): Fusing consumer GroupByKey/Write into GroupByKey/Reify 
2016-05-11T13:06:31.761Z: Detail: (eb15ba3070c2a3d6): Fusing consumer AnonymousParDo into GroupByKey/GroupByWindow 
2016-05-11T13:06:31.763Z: Detail: (eb15ba3070c2aec4): Fusing consumer GroupByKey/Reify into ParDo(Article2KV) 
2016-05-11T13:06:31.765Z: Detail: (eb15ba3070c2a9b2): Fusing consumer ParDo(FlattenArticles) into ParDo(ParserEdition) 
2016-05-11T13:06:31.768Z: Detail: (eb15ba3070c2a4a0): Fusing consumer ParDo(Article2KV) into ParDo(GetRelatedArticles) 
2016-05-11T13:06:31.815Z: Basic: (eb15ba3070c2aa26): Worker configuration: n1-standard-1 in us-central1-f. 
2016-05-11T13:06:32.154Z: Detail: (eb15ba3070c2a931): Adding StepResource setup and teardown to workflow graph. 
2016-05-11T13:06:32.262Z: Basic: (120e40c18a94ee3a): Starting 3 workers... 
2016-05-11T13:06:32.272Z: Basic: S01: (b31e9392dace1359): Executing operation GroupByKey/Create 
2016-05-11T13:06:32.504Z: Basic: S02: (27044e90035e1dd6): Executing operation ReadEditions4GCS+ParDo(ParserEdition)+ParDo(FlattenArticles)+ParDo(GetRelatedArticles)+ParDo(Article2KV)+GroupByKey/Reify+GroupByKey/Write 
2016-05-11T13:07:11.352Z: Detail: (e26d7dfd74bb5700): Workers have started successfully. 
2016-05-11T13:07:23.464Z: Error: (91724060ab73dbcb): java.io.IOException: DATA_LOSS: Inconsistent number of records, parsed 108, expected 109 when dataflow-articlemetadatapipeline-g-05110606-31f5-harness-cmwd talking to tcp://localhost:12345 
    at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) 
    at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.writeChunk(ChunkingShuffleEntryWriter.java:72) 
    at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.close(ChunkingShuffleEntryWriter.java:66) 
    at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:272) 
    at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) 
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:254) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:191) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:144) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:180) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:161) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:148) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

私は同じコードを複数回実行します。私は少し異なる例外としても

2016-05-11T13:00:27.649Z: Detail: (7ad6fdbb36cc3e7a): Checking required Cloud APIs are enabled. 
2016-05-11T13:00:27.994Z: Detail: (7ad6fdbb36cc3ed9): Expanding GroupByKey operations into optimizable parts. 
2016-05-11T13:00:27.998Z: Detail: (7ad6fdbb36cc350f): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns 
2016-05-11T13:00:28.009Z: Detail: (7ad6fdbb36cc37b1): Annotating graph with Autotuner information. 
2016-05-11T13:00:28.106Z: Detail: (7ad6fdbb36cc356e): Fusing adjacent ParDo, Read, Write, and Flatten operations 
2016-05-11T13:00:28.110Z: Detail: (7ad6fdbb36cc3ba4): Fusing consumer ParDo(ParserEdition) into ReadEditions4GCS 
2016-05-11T13:00:28.112Z: Detail: (7ad6fdbb36cc31da): Fusing consumer ParDo(GetRelatedArticles) into ParDo(FlattenArticles) 
2016-05-11T13:00:28.114Z: Detail: (7ad6fdbb36cc3810): Fusing consumer GroupByKey/GroupByWindow into GroupByKey/Read 
2016-05-11T13:00:28.117Z: Detail: (7ad6fdbb36cc3e46): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow 
2016-05-11T13:00:28.120Z: Detail: (7ad6fdbb36cc347c): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Read 
2016-05-11T13:00:28.124Z: Detail: (7ad6fdbb36cc3ab2): Fusing consumer Write2Gcs/Write2Gcs into Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup 
2016-05-11T13:00:28.127Z: Detail: (7ad6fdbb36cc30e8): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Write into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify 
2016-05-11T13:00:28.129Z: Detail: (7ad6fdbb36cc371e): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify into Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey 
2016-05-11T13:00:28.132Z: Detail: (7ad6fdbb36cc3d54): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey into Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into() 
2016-05-11T13:00:28.135Z: Detail: (7ad6fdbb36cc338a): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into() into ParDo(Article2CSV) 
2016-05-11T13:00:28.137Z: Detail: (7ad6fdbb36cc39c0): Fusing consumer ParDo(Article2CSV) into AnonymousParDo 
2016-05-11T13:00:28.139Z: Detail: (7ad6fdbb36cc3ff6): Fusing consumer GroupByKey/Write into GroupByKey/Reify 
2016-05-11T13:00:28.141Z: Detail: (7ad6fdbb36cc362c): Fusing consumer AnonymousParDo into GroupByKey/GroupByWindow 
2016-05-11T13:00:28.144Z: Detail: (7ad6fdbb36cc3c62): Fusing consumer GroupByKey/Reify into ParDo(Article2KV) 
2016-05-11T13:00:28.146Z: Detail: (7ad6fdbb36cc3298): Fusing consumer ParDo(FlattenArticles) into ParDo(ParserEdition) 
2016-05-11T13:00:28.148Z: Detail: (7ad6fdbb36cc38ce): Fusing consumer ParDo(Article2KV) into ParDo(GetRelatedArticles) 
2016-05-11T13:00:28.196Z: Basic: (7ad6fdbb36cc3b3c): Worker configuration: n1-standard-1 in us-central1-f. 
2016-05-11T13:00:28.459Z: Detail: (7ad6fdbb36cc3b9b): Adding StepResource setup and teardown to workflow graph. 
2016-05-11T13:00:28.639Z: Basic: (cea9ab4bd124bf89): Starting 3 workers... 
2016-05-11T13:00:28.658Z: Basic: S01: (e5a53851aa035056): Executing operation GroupByKey/Create 
2016-05-11T13:00:28.896Z: Basic: S02: (5803a8f4cae47397): Executing operation ReadEditions4GCS+ParDo(ParserEdition)+ParDo(FlattenArticles)+ParDo(GetRelatedArticles)+ParDo(Article2KV)+GroupByKey/Reify+GroupByKey/Write 
2016-05-11T13:01:12.228Z: Detail: (5d4a90d7ea1437dd): Workers have started successfully. 
2016-05-11T13:01:22.911Z: Error: (f5a249985c78da4a): com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: unable to parse secondary key 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:193) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at uk.news.pipeline.api.ArticleMetaDataPipeline$Article2KV.processElement(ArticleMetaDataPipeline.java:147) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at uk.news.pipeline.api.ArticleMetaDataPipeline$GetRelatedArticles.lambda$processElement$0(ArticleMetaDataPipeline.java:71) 
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) 
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) 
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512) 
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) 
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) 
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689) 
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
Caused by: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: unable to parse secondary key 
    at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables.propagate(Throwables.java:160) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:176) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38) 
Caused by: java.io.IOException: INVALID_ARGUMENT: unable to parse secondary key 
    at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) 
    at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.writeChunk(ChunkingShuffleEntryWriter.java:72) 
    at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.put(ChunkingShuffleEntryWriter.java:56) 
    at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:263) 
    at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:169) 
    at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.process(WriteOperation.java:90) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at uk.news.pipeline.api.ArticleMetaDataPipeline$Article2KV.processElement(ArticleMetaDataPipeline.java:147) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at uk.news.pipeline.api.ArticleMetaDataPipeline$GetRelatedArticles.lambda$processElement$0(ArticleMetaDataPipeline.java:71) 
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) 
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) 
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512) 
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) 
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) 
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689) 
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 

2016-05-11T13:01:25.776Z: Error: (e9a78cb2969ddea0): java.lang.RuntimeException: java.io.IOException: DATA_LOSS: Inconsistent number of records, parsed 97, expected 98 when dataflow-articlemetadatapipeline-g-05110600-a71d-harness-p0a2 talking to tcp://localhost:12345 
    at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables.propagate(Throwables.java:160) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:176) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38) 
Caused by: java.io.IOException: DATA_LOSS: Inconsistent number of records, parsed 97, expected 98 when dataflow-articlemetadatapipeline-g-05110600-a71d-harness-p0a2 talking to tcp://localhost:12345 
    at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) 
    at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.writeChunk(ChunkingShuffleEntryWriter.java:72) 
    at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.put(ChunkingShuffleEntryWriter.java:56) 
    at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:263) 
    at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:169) 
    at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.process(WriteOperation.java:90) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at uk.news.pipeline.api.ArticleMetaDataPipeline$Article2KV.processElement(ArticleMetaDataPipeline.java:147) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487) 
    at uk.news.pipeline.api.ArticleMetaDataPipeline$GetRelatedArticles.processElement(ArticleMetaDataPipeline.java:69) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 

を....得るか

コード:DoFn.Context#output

static class ParserEdition extends DoFn<String, Edition> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      final String editionStr = c.element(); 
      ObjectMapper mapper = new ObjectMapper(); 
      ObjectReader reader = mapper.reader(Edition.class); 
      final Object editionObj = reader.readValue(editionStr); 
      c.output((Edition)editionObj); 
     } 
    } 

    static class FlattenArticles extends DoFn<Edition,Article>{ 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      final List<Article> articleList = c.element().getArticleList(); 
      for (Article a : articleList){ 
       c.output(a); 
      } 
     } 
    } 

    static class GetRelatedArticles extends DoFn<Article, Article>{ 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 

      final Article tArticle = c.element(); 
      if(tArticle.getCategory().equals("article")){ 
       Article cloneArticle = (Article)SerializationUtils.clone(tArticle); 
       cloneArticle.setImage(getRelatedImage(tArticle)); 
       c.output(cloneArticle); 
       final List<Article> relateArticle = getRelateArticle(tArticle, 5); 
       relateArticle.parallelStream().forEach(a -> c.output(a)); 
      } 
     } 

     public List<Article> getRelateArticle(Article art, int i){ 
      List<Article> list = new ArrayList<>(); 
      if(i <= 0 || art.getArticleList() == null){ 
       return null; 
      }else { 
       for(Article a : art.getArticleList()) { 
        if (a.getCategory().equals("article")) { 
         Article cloneArticle = (Article)SerializationUtils.clone(a); 
         cloneArticle.setImage(getRelatedImage(a)); 
         list.add(cloneArticle); 
         final List<Article> relateArticle = getRelateArticle(a, i - 1); 
         if (relateArticle != null) { 
          list.addAll(relateArticle); 
         } 
        } 
       } 
      } 
      return list; 
     } 

     public Image getRelatedImage(Article art){ 
      Image image = new Image(); 
      try{ 
       final Article article = art.getArticleList().parallelStream().filter(
         a -> (a.getCategory().equals("image") && a.getIdentifier().equals(art.getLeadAssetId()))) 
         .findFirst().get(); 
       if(article!=null){ 
        image.setId(article.getIdentifier()); 
        image.setImageUrl(URLEncoder.encode(article.getCrops().get(0).getImageId(), Charset.defaultCharset().name())); 
       } 
      }catch (Exception e){   } 
      return image; 
     } 
    } 

    static class Article2CSV extends DoFn<Article,String>{ 

     private String delimiter; 

     Article2CSV(String delimiter){ 
      this.delimiter = delimiter; 
     } 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      final Article a = c.element(); 
      String str = a.getIdentifier()+delimiter+a.getTitle() +delimiter+getTeaserText(a) + 
        delimiter+a.getPublished() +delimiter+ a.getLeadAssetId() + 
        delimiter+ a.getImage().getImageUrl(); 
      c.output(str); 
     } 

     private String getTeaserText(Article a){ 
      String teaser = ""; 
      if(!a.getContent().isEmpty()){ 
       for(Content c : a.getContent()){ 
        if(teaser.length() <= 100){ 
         teaser = teaser + c.getData().getText(); 
        } 
       } 
      } 
      return teaser; 
     } 
    } 


    static class Article2KV extends DoFn<Article, KV<String, Article>> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      final Article art = c.element(); 
      if(art!=null && !StringUtils.isBlank(art.getIdentifier())) 
       c.output(KV.of(art.getIdentifier(),art)); 
     } 
    } 

........ 


     PipelineOptionsFactory.register(ArticleMetaDataOptions.class); 
     DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(ArticleMetaDataOptions.class); 

     final ArticleMetaDataOptions opts = (ArticleMetaDataOptions) options; 
     if (!opts.isTestMode()) 
      options.setRunner(BlockingDataflowPipelineRunner.class); 

     options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.DEBUG); 
     Pipeline p = Pipeline.create(options); 

     final PCollection<String> edition4GCS = p.apply(TextIO.Read.named("ReadEditions4GCS") 
       .from("gs://editions-newsuk/*")); 

     // get articles from all the editions 
     final PCollection<Article> articlePCollection = edition4GCS.apply(ParDo.of(new ParserEdition())).apply(ParDo.of(new FlattenArticles())); 

     // get related articles 
     final PCollection<Article> articles = articlePCollection.apply(ParDo.of(new GetRelatedArticles())); 

     // convert into KV 
     final PCollection<KV<String, Article>> articlesKV = articles.apply(ParDo.of(new Article2KV())); 

     // Group by *** if this code below this commented. It then always works... 
     final PCollection<KV<String, Iterable<Article>>> groupByCollection = articlesKV.apply(GroupByKey.<String, Article>create()); 


     // filter the duplicate/partial articles 
     PCollection<Article> filterArticles = groupByCollection.apply(ParDo.of(new DoFn<KV<String, Iterable<Article>>, Article>() { 
        public void processElement(ProcessContext c) { 
         String articleId = c.element().getKey(); 
         Iterable<Article> arts = c.element().getValue(); 
         boolean found = false; 
         Article article = null; 
         if(arts!=null){ 
          for(Article at : arts){ 
           article = at; 
           if(at!=null && !StringUtils.isBlank(at.getImage().getImageUrl())){ 
            found = true; 
            c.output(at); 
            break; 
           } 
          } 
          if(!found){ 
           c.output(article); 
          } 
         } 
        }})); 


     // transform into file and persist to GCS 
     filterArticles.apply(ParDo.of(new Article2CSV(opts.getDelimiter()))).apply(TextIO.Write.named("Write2Gcs").withoutSharding().to(opts.getOutputLocation())); 
+0

あなたの 'DoFn'のコードをもう少し分かち合うことができますか?たとえば、 'ParserEdition'、' FlattenArticles'、 'GetRelatedArticles'、' Article2KV'、あるいはそれぞれのハイレベルな要約などです。これらのジョブに関連するジョブIDも共有することができますので、詳しく見てみましょう。 –

+0

仕事のIds:2016-05-11_05_39_43-14925035554635753922、2016-05-11_05_46_24-2654288952822529571 2016-05-11_06_00_26-8122161173741086647,2016-05-11_06_06_30-17852890725098926978、2016-05-11_06_20_15-7642311360028712413 – gana

+0

@BenChambers、感謝します。上記のコメントでコードとジョブIDを更新しました。 – gana

答えて

4

コールがprocessElementstartBundle関連するかfinishBundleから復帰する前に同期され、完全にする必要があります方法。

共有コードでは、出力要素にsomeList.parallelStream().forEach(e -> c.output(e))を使用していたようです。 parallelStreamの使用は要件に違反します。

通常の(非平行)を使用forEachは、これらの問題を防ぐべきです。

関連する問題