2017-04-02 12 views
3

私はPubSub Sourceを持つPipelineを実行しています。私はPipelineをクラッシュさせていくつかの奇妙な例外が発生しています。私はいくつかの要素(3-10)をうまく処理することができます。そして、次の2つのエラーメッセージが突然現れます。どちらも私が間違っているかもしれないことを私に知らせるものではないので、私はすべてのTransformを削除し、ソースを残しておけば問題は解決しません。 PubSubにいくつかのテスト文字列をポストしています。どんな助けもありがとうございます。Apache Beam PubSub Readerの例外

例外1:

[WARNING] 
java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
     at java.lang.Thread.run(Thread.java:724) 
Caused by: java.lang.NullPointerException 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:640) 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:313) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:174) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:127) 
     at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) 
     at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

例外2:

[WARNING] 
java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
     at java.lang.Thread.run(Thread.java:724) 
Caused by: java.lang.IllegalStateException: Cannot finalize a restored checkpoint 
     at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:293) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:205) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:142) 
     at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) 
     at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

Basicコード:

PipelineOptions options = PipelineOptionsFactory.create(); 
PubsubOptions dataflowOptions = options.as(PubsubOptions.class); 
dataflowOptions.setStreaming(true); 

Pipeline p = Pipeline.create(options); 

p.apply(PubsubIO.<String>read().subscription("my-subscription") 
    .withCoder(StringUtf8Coder.of()))); 

実行:

mvn compile exec:java -Dexec.mainClass=my.package.SalesTransactions -Dexec.args="--runner BlockingDataflowRunner --project=my-project --tempLocation=gs://my-project/tmp" 

答えて

1

この問題は、DirectRunnerのバグ(BEAM-1656)とPubsubCheckpoint内の前提条件のために存在します。

回答Apache Beam: PubsubReader fails with NPEには、バグとその解決方法の詳細が含まれています。ありがとう!

+0

あなたの答えをありがとう。私は最新のスナップショットに更新し、すぐには起こらなかったので、最初の希望だったが、上記のNullPointerExceptionがまだ起こっていると思われる。 – jimmy

+1

https://github.com/apache/beam/pull/2368で解決されたPubSubのバグがありましたが、すぐにコミットされています。ご迷惑をおかけし申し訳ございません。 –

関連する問題