2017-01-04 3 views
3

私はPubsubIO.Read.subscriptionの読み込みを行う単純なパイプラインを持っています。これは、次の例外を除いて、約200の要素を消費した後、すべての実行のために失敗します。InProcessPipelineRunnerで実行するとPubsubReaderがNullPointerExceptionで失敗しました。

[error] (run-main-0) java.lang.RuntimeException: java.lang.NullPointerException 
java.lang.RuntimeException: java.lang.NullPointerException 
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:281) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69) 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181) 
    at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:296) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
Caused by: java.lang.NullPointerException 
    at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:612) 
    at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:297) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:203) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishBundle(UnboundedReadEvaluatorFactory.java:172) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor.finishBundle(TransformExecutor.java:163) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor.run(TransformExecutor.java:119) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

私はSDKのバージョン1.9.0を使用しています。 これは1.6.1(正常にフェッチされた> 10k個の要素)では発生していません。 回避策を知っている人はいますか?私はまた、1.9.0が1.6.1よりずっと高速にフェッチされていることに気づいた。 1.6.1では、10要素のバッチを使用するようです。

p.apply(PubsubIO.Read.named("reading_topic_test").topic("projects/***/topics/SL_LogLogin")) 

スクリプションが自動的に生成されたが、その後、パイプラインは失敗しました::


は、読み取りフォームのトピックでテスト

Jan 05, 2017 2:06:33 PM com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource apply 
WARNING: Created subscription null to topic NestedValueProvider{value=NestedValueProvider{value=StaticValueProvider{value=projects/***/topics/SL_LogLogin}}}. 
Note this subscription WILL NOT be deleted when the pipeline terminates 
[error] (run-main-0) com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: 
Error while populating display data for component:com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn 
com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: 
Error while populating display data for component: com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:664) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.populateDisplayData(ParDo.java:1266) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$200(ParDo.java:457) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.populateDisplayData(ParDo.java:816) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.forRoot(DisplayData.java:630) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.access$000(DisplayData.java:617) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData.from(DisplayData.java:76) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.access$100(DisplayDataValidator.java:29) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator$Visitor.visitTransform(DisplayDataValidator.java:62) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103) 
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:245) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69) 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181) 
    at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:303) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
Caused by: java.lang.NullPointerException: Input display value cannot be null 
    at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.addItemIf(DisplayData.java:707) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.add(DisplayData.java:685) 
    at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn.populateDisplayData(PubsubUnboundedSource.java:1147) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.populateDisplayData(ParDo.java:1266) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$200(ParDo.java:457) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.populateDisplayData(ParDo.java:816) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.forRoot(DisplayData.java:630) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.access$000(DisplayData.java:617) 
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData.from(DisplayData.java:76) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.access$100(DisplayDataValidator.java:29) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator$Visitor.visitTransform(DisplayDataValidator.java:62) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103) 
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:245) 
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69) 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181) 
    at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:303) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
+0

確認するには、トピックではなく、パイプラインへのサブスクリプションを明示的に提供していますか? –

+0

正解、私はトピックの問題を上記の投稿に更新しました。 – Valentin

+0

レイテンシに関連したバグが原因で、最新のものが発生する可能性があります。 – Valentin

答えて

1

これはhttps://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/547によって対処されます。トラブルのお詫び

インキュベータービームとGoogleDataflowSDKの間にバックポーリングエラーが発生したため、ジョブごとのサブスクリプションの場合に返されたサブスクリプションを初期化できませんでした。

関連する問題