2016-06-23 7 views
2

次のコードを実行すると、なぜこのEOFExceptionが発生しますか?CoderException:Jacksonを使用してCustomCoderでエンコードされたJson値でGroupByKeyを実行するときのjava.io.EOFException

単純な状況で私は正常にGroupByKeyを使用しました。エラーを引き起こすように見えるのは、(Jsonオブジェクト用の)カスタムコーダーを使用していると思います。なぜこれが起こっているのか誰も説明できますか?ここで

は誤りです:ここでは

com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException 

    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186) 
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106) 
    at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119) 
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) 
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) 
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 
Caused by: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException 
    at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:62) 
    at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83) 
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621) 
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553) 
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98) 
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42) 
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157) 
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140) 
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134) 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107) 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readFully(DataInputStream.java:197) 
    at java.io.DataInputStream.readLong(DataInputStream.java:416) 
    at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:58) 
    at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83) 
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621) 
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553) 
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98) 
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42) 
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157) 
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140) 
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134) 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107) 
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1303) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) 
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38) 
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084) 
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) 
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) 
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106) 
    at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85) 

コードです:

package com.example.dataflow; 

import com.fasterxml.jackson.core.JsonGenerator; 
import com.fasterxml.jackson.core.JsonParser; 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.cloud.dataflow.sdk.coders.CustomCoder; 
import com.google.cloud.dataflow.sdk.testing.CoderProperties; 
import com.google.cloud.dataflow.sdk.testing.TestPipeline; 
import com.google.cloud.dataflow.sdk.transforms.*; 
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; 
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; 
import com.google.cloud.dataflow.sdk.util.WindowedValue; 
import org.joda.time.Instant; 
import org.junit.Assert; 
import org.junit.Test; 

import java.io.ByteArrayInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 


class ParseJson extends DoFn<String, JsonNode> { 

    private static final long serialVersionUID = 1L; 
    private transient ObjectMapper om; 

    { init(); } 

    private void init() { 
     om = new ObjectMapper(); 
    } 

    private void readObject(java.io.ObjectInputStream in) 
      throws IOException, ClassNotFoundException { 
     init(); 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     JsonNode node = om.readTree(c.element()); 
     c.output(node); 
    } 
} 

class JsonNodeCoder extends CustomCoder<JsonNode> { 

    private static final long serialVersionUID = 1L; 

    private ObjectMapper mapper = new ObjectMapper(); 

    private static final JsonNodeCoder INSTANCE = new JsonNodeCoder(); 

    public static JsonNodeCoder of() { 
     return INSTANCE; 
    } 

    @Override 
    public void encode(JsonNode value, OutputStream outStream, Context context) throws IOException { 
     mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false).writeValue(outStream, value); 
    } 

    @Override 
    public JsonNode decode(InputStream inStream, Context context) throws IOException { 
     return mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false).readTree(inStream); 
    } 
} 

public class TestGroupByKeyCustomCoder { 

    @Test // original code the produces the error 
    public void testPipeline() throws IOException { 

     TestPipeline p = TestPipeline.create(); 

     p.getCoderRegistry().registerCoder(JsonNode.class, JsonNodeCoder.class); 

     p.apply(Create.of("{}")) 
       .apply(ParDo.of(new ParseJson())) 
       .apply(WithKeys.of("foo")) 
       .apply("GroupByAction", GroupByKey.create()); 

     p.run(); 
    } 

    // Test as per Kenn Knowles' suggestion 
    // this throws the same error 
    @Test 
    public void testCustomCoder() throws Exception { 
     ObjectMapper mapper = new ObjectMapper(); 
     JsonNode value = mapper.readTree("{}"); 

     WindowedValue.FullWindowedValueCoder<JsonNode> windowedValueCoder 
       = WindowedValue.FullWindowedValueCoder 
        .of(JsonNodeCoder.of(), GlobalWindow.Coder.INSTANCE); 

     WindowedValue<JsonNode> x = WindowedValue.of(
       value, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING); 
     CoderProperties.coderDecodeEncodeEqual(windowedValueCoder, x); 
    } 
} 

この問題は、結果的にデータフローが探しているタイムスタンプを飲み込む、readTreeがあまりにも多くの入力を消費することによって引き起こされているように見えます。

@Test 
public void testJackson() throws IOException { 
    ObjectMapper mapper = new ObjectMapper(); 
    ByteArrayInputStream bis = new ByteArrayInputStream("{}1".getBytes()); 
    mapper.readTree(bis); 
    Assert.assertNotEquals(bis.read(), -1); // assertion fails 
} 

答えて

2

スタックトレースは、tの時点でファイルの最後に到達したことを示します。彼はビッグエンディアンlongのタイムスタンプが解析されています。

encoding used by WindowedValue.FullWindowedValueCoderはエンコードされた値で、その後にタイムスタンプが続き、その後にウィンドウと最後にペインのメタデータが続きます。したがって、これはJsonCoderが入力ストリームから多すぎるバイトを消費していることを意味します(多分それらのすべてでしょうか?)ので、タイムスタンプのデコードがファイルの最後に当たることを意味します。

SDKは、CoderPropertiesでコーダーをテストするための多くのユーティリティを提供しています。実際には、CoderProperties#coderDecodeEncodeEqualをコーダーWindowedValue.FullWindowedValueCoder.of(JsonCoder.of(), new GlobalWindow.Coder())で実行して、このケースをグローバルウィンドウに直接テストすることができます。

encodedecodeには、Coder.Contextに気付いている可能性が高いフラグが渡されています。

  • Coder.Context.OUTERは、コーダーが最も外側のCoderであり、ストリーム全体を所有していることを示します。この場合、エンコード時にEOF信号を活用し、長さ接頭辞やかっこなどのメタデータを除外し、デコード時に好きなだけ消費することができます。
  • Coder.Context.NESTEDは、Coderが値の一部をエンコードしていることを示しているため、独自のエンコードからバイトだけをインテリジェントに消費できる十分なメタデータを書き込む必要があります。
+0

ご返信ありがとうございます@KennKnowles。あなたが記述したようにテストを作成しようとしましたが、それを上記のコードに追加しました。同じエラーが発生します。私は少し深く掘り下げて、 'ObjectMapper.readTree()'は有効なJSON文書の後に続く入力を無視して破棄することを発見しました。デコーダがどこで停止するかを知るために、エンコード時にすべてのデータフローコーダに出力の長さを書き込ませるのが標準的な方法ですか? –

+0

ネストされたコンテキストでエンコードする場合は、実際にこれを管理する必要があります。そういうわけで私は答えの説明を追加しました。 ByteArrayCoderを見れば、コンテキストがストリーム全体の場合はlength-prefixから離れることがわかります。 –

関連する問題