次のコードを実行すると、なぜこのEOFExceptionが発生しますか?CoderException:Jacksonを使用してCustomCoderでエンコードされたJson値でGroupByKeyを実行するときのjava.io.EOFException
単純な状況で私は正常にGroupByKey
を使用しました。エラーを引き起こすように見えるのは、(Jsonオブジェクト用の)カスタムコーダーを使用していると思います。なぜこれが起こっているのか誰も説明できますか?ここで
は誤りです:ここでは
com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186)
at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException
at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:62)
at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:58)
at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1303)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449)
at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)
コードです:
package com.example.dataflow;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.cloud.dataflow.sdk.testing.CoderProperties;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
class ParseJson extends DoFn<String, JsonNode> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper om;
{ init(); }
private void init() {
om = new ObjectMapper();
}
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
init();
}
@Override
public void processElement(ProcessContext c) throws Exception {
JsonNode node = om.readTree(c.element());
c.output(node);
}
}
class JsonNodeCoder extends CustomCoder<JsonNode> {
private static final long serialVersionUID = 1L;
private ObjectMapper mapper = new ObjectMapper();
private static final JsonNodeCoder INSTANCE = new JsonNodeCoder();
public static JsonNodeCoder of() {
return INSTANCE;
}
@Override
public void encode(JsonNode value, OutputStream outStream, Context context) throws IOException {
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false).writeValue(outStream, value);
}
@Override
public JsonNode decode(InputStream inStream, Context context) throws IOException {
return mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false).readTree(inStream);
}
}
public class TestGroupByKeyCustomCoder {
@Test // original code the produces the error
public void testPipeline() throws IOException {
TestPipeline p = TestPipeline.create();
p.getCoderRegistry().registerCoder(JsonNode.class, JsonNodeCoder.class);
p.apply(Create.of("{}"))
.apply(ParDo.of(new ParseJson()))
.apply(WithKeys.of("foo"))
.apply("GroupByAction", GroupByKey.create());
p.run();
}
// Test as per Kenn Knowles' suggestion
// this throws the same error
@Test
public void testCustomCoder() throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode value = mapper.readTree("{}");
WindowedValue.FullWindowedValueCoder<JsonNode> windowedValueCoder
= WindowedValue.FullWindowedValueCoder
.of(JsonNodeCoder.of(), GlobalWindow.Coder.INSTANCE);
WindowedValue<JsonNode> x = WindowedValue.of(
value, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING);
CoderProperties.coderDecodeEncodeEqual(windowedValueCoder, x);
}
}
この問題は、結果的にデータフローが探しているタイムスタンプを飲み込む、readTreeがあまりにも多くの入力を消費することによって引き起こされているように見えます。
@Test
public void testJackson() throws IOException {
ObjectMapper mapper = new ObjectMapper();
ByteArrayInputStream bis = new ByteArrayInputStream("{}1".getBytes());
mapper.readTree(bis);
Assert.assertNotEquals(bis.read(), -1); // assertion fails
}
ご返信ありがとうございます@KennKnowles。あなたが記述したようにテストを作成しようとしましたが、それを上記のコードに追加しました。同じエラーが発生します。私は少し深く掘り下げて、 'ObjectMapper.readTree()'は有効なJSON文書の後に続く入力を無視して破棄することを発見しました。デコーダがどこで停止するかを知るために、エンコード時にすべてのデータフローコーダに出力の長さを書き込ませるのが標準的な方法ですか? –
ネストされたコンテキストでエンコードする場合は、実際にこれを管理する必要があります。そういうわけで私は答えの説明を追加しました。 ByteArrayCoderを見れば、コンテキストがストリーム全体の場合はlength-prefixから離れることがわかります。 –