データフローランナーを使用してダイレクトランナーとローカルで両方ともパイプラインを実行するapache-beamアプリケーションがあります。ローカルでは動作しますが、googleのデータフローランナーでは失敗します。ここApache Beamで実行時に作成されたクラスをシリアライズする方法
エラートレースは、次のとおり
(9938ce94c0752c7):java.lang.RuntimeException:com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentExceptionが:com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply(MapTaskExecutorFactory.java:283) で直列化されたDoFnInfo をデシリアライズすることができないでcom.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply (MapTaskExecutorFactory.java:253) (com.google.cloud.dataflow.worker.graph.Networks)$ TypeSafeNodeFunction.apply(Networks.java:55) at com.google.cloud.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply(Networks.java:43) (com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) at) com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:142) (com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271) 、com.google.cloud) dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) (com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness)$ WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) (com.google.cloud.dataflow.worker) DataflowBatchWorkerHarness $ WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ W orkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java。 (Thread.java:745)
原因:com.google.cloud.dataflow.worker.repackaged.com .google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalArgumentException:直列化されたDoFnInfoを直列化解除できない場合 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.get( LocalCache.java:2214) (com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)) (com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache.get(LocalCache.java:4899) at com.google.cloud.dataflow.worker.UserParDoFnFactory.create UserParDoFnFactory.java:95) at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:66) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:360) com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply(MapTaskExecutorFactory.java:271) で ... 14もっと
によって引き起こさ:java.lang.IllegalArgumentExceptionが:org.apacheで直列化されたDoFnInfo をデシリアライズすることができません.beam.sdk.util.SerializableUtils.deserializeFromByteArray(Seriali zooUtils.java:75) at com.google.cloud.dataflow.worker.UserParDoFnFactory $ UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:64) at com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call(UserParDoFnFactory.java :100) (com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call(UserParDoFnFactory.java:97) 、com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache $ 1.load(LocalCache.java:4904) (com.google.cloud.dataflow.worker.repackaged.com.google.common.cache)LocalCache $ LoadingValueReference.loadFuture(LocalCache.java:3628)com35.png。 google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.lockedGetOrLoad(LocalCache.java:2295) (com.google.cloud.dataflow.worker.repackaged.com.google.common)。 cache.LocalCache $ Segment.get(LocalCache.java:2208)によって引き起こさ ... 20もっと
:java.lang.ClassNotFoundExceptionが:java.net.URLClassLoader.findClassでHeader_H (URLClassLoader.java:381) でjava.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher $ AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) (java.lang.Class.forName0)(ネイティブメソッド) at java.lang.Class.forName(Class.java:348) at java.io .ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream .readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1942) at java.io.Object InputStream.readOrdinaryObject(ObjectInputStream.java:1808) のjava.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) のjava.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) のjava.io.ObjectInputStreamです。 readSerialData(ObjectInputStream.java:1942) のjava.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) のjava.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) のjava.io.ObjectInputStream.readObject( org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArrayでObjectInputStream.java:373) (SerializableUtils.java:72) ... 28もっと
それは
"...直列化DoFnInfoをデシリアライズすることができない"
と
をを指して、 "...にjava.lang.ClassNotFoundException:Header_H"
これは私がbytebuddyコードを使ってクラスHeader_H
を作るのと関係があると思われます。私はbytebuddyを使用して、既存のソースコードにsome.class
に基づいてサブクラスを構築し、実行時に構成ファイルから追加のユーザー入力を受け取りました。つまり、実行時にHeader_H
しか利用できなくなりました。
私bytebuddyコードは多少このようなものです:
builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC);
.defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L)
.implement(Serializable.class);
Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded();
そしてclazz
(この場合はHeader_H
に)データフロー内のパイプラインに渡されます。一時的なGoogleクラウドステージの場所にあるjarファイルの内容を確認すると、some.class
が表示されますが、Header_H.class
ではなく、おそらく "ClassNotFoundException"というエラーが発生します。
私の推論が正しければ、クラス作成時にimplement(Serializable.class)
があるとすれば、データフローランナーに送信するjarファイルにランタイム作成クラスを配置するにはどうすればよいですか?
上記のコードでは、 'type.inject(somejar)'と 'type.saveIn(somefolder)'を意味しますか? – bignano