2017-08-09 10 views
0

Apache Beam 2.0.0と同じバージョンのFlinkRunner(scala 2.10)を使用しています。私はFlinkRunner依存関係を持つインプロセスFlinkマスター(デフォルト構成)に対して、実行時にFlink 1.2.1を明らかに導入しています(MVN依存関係ツリーを参照)。Apache Beam/Flink ExceptionInChainedStubException

「ユーザー例外」が発生したときに実際に何がうまくいかなかったかを把握する最良の方法は何ですか?これは今回私が間違っていたことに関する質問ではありません。むしろ一般的にどのようにBeamかFlinkのいずれかからより多くの情報を得る方法を伝える方法です。ここでスタックトレースです:私が書いたコードに関連するものの完全な欠如

Exception in thread "main" java.lang.RuntimeException: Pipeline execution failed 
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:122) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) 
at com.mapfit.flow.data.environment.MFEnvironment.run(MFEnvironment.java:70) 
at com.mapfit.flow.main.Scratch.main(Scratch.java:35) 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException 
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
at org.apache.beam.sdk.transforms.MapElements$1$auxiliary$PCieS8xh.invokeProcessElement(Unknown Source) 
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197) 
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158) 
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118) 
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) 
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) 
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException 
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82) 
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629) 
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) 

お知らせ(私の呼び出し以外はpipeline.runします())。

私は私のリンク瓶のそれぞれのソースコードをダウンロードして、私は(私の値が使用ライン82上の例外を投げ、そして最終的にはJavaオブジェクトのシリアライズに呼び出しによって生成されてEOFExceptionを見終わったChainedFlatMapDriverに足を踏み入れデフォルトのコーダー)。私は何かに夢中になったと思っていましたが、EOFExceptionの原因はSimpleCollectingOutputView行79にあり、多くの場合スローされ、Flinkのルーチン実行と思われるように飲み込まれることがよくあります。

Flinkに障害情報を開示する方法を知っている人の指摘はありますか?

Just found more info after walking through more Flink code in the debugger: java.lang.InterruptedException 
at java.lang.Object.wait(Native Method) 
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) 
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) 
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) 
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) 
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:46) 
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:30) 
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) 
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629) 
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) 
at org.apache.beam.sdk.transforms.MapElements$1$auxiliary$vuuNRtio.invokeProcessElement(Unknown Source) 
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197) 
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158) 
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118) 
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) 
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) 
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) 
at java.lang.Thread.run(Thread.java:745) 

答えて

1

をこれら二つのリンクを見てみましょう:糸にflinkrunner上でビームを実行中に EOFException related to memory segments during run of Beam pipeline on Flink

https://issues.apache.org/jira/browse/BEAM-2831

を、私は同様の例外を参照するために使用詳細については、デバッグ後に発見された

。問題のページに提案されたコーダーが助けになりました。

パイプラインが円滑に動作するまで、ロガーを広範囲に使用することをお勧めします。ヤーンログでは、ヤーンログコマンドで検索できます。あなたのケース(インラインプロセスのフリンクマスター)についてはわかりませんが、私が想定しているいくつかのログを書くことができるはずです...

+0

私はこのケースを開いたので、あなたが言及したリンクは存在しました)、いつものように誰も助けることができませんでした。そして、リンクで言及されたのと基本的に同じ方法で問題を回避し、このチケットを無視しました:) – mephicide