2016-06-14 9 views
2

新しいGoogleのデータフローバージョン1.6にアップグレードしました。ローカルマシンでテストすると、パイプラインの最後にjava.lang.IllegalStateExceptionが発生します。バージョン1.5.1ではこの問題はありませんでした。Google Dataflowでjava.lang.IllegalStateExceptionが表示されるのはなぜですか?

ローカルのライブ環境では発生しません。それは新しいバージョンのバグですか?それらのエラーを避けるためにコードに変更を加える必要がありますか?

問題を見つけるためにパイプラインの一部を添付しました。

private static void getTableRowAndWrite(final PCollection<KV<Integer, Iterable<byte[]>>> groupedTransactions, final String tableName) { 
    // Get the tableRow element from the PCollection 
    groupedTransactions 
      .apply(ParDo 
        .of(((tableName.equals("avail")) ? new GetTableRowAvail() : new GetTableRowReservation())) //Get a TableRow 
        .named("Get " + tableName + " TableRows")) 
      .apply(BigQueryIO 
        .Write 
        .named("Write to BigQuery " + tableName) //Write to BigQuery 
        .withSchema(createTableSchema()) 
        .to((SerializableFunction<BoundedWindow, String>) window -> { 
         String date = window.toString(); 
         String date2 = date.substring(1, 5) + date.substring(6, 8) + date.substring(9, 11); 
         return "travelinsights-1056:hotel." + tableName + "_full_" + (TEST ? "test_" : "") + date2; 
        }) 
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      ); 
} 

エラーは次のとおりです。

Exception in thread "main" java.lang.IllegalStateException: Cleanup time 294293-06-23T12:00:54.774Z is beyond end-of-time 
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) 
at com.google.cloud.dataflow.sdk.util.ReduceFnRunner.onTimer(ReduceFnRunner.java:642) 
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advance(BatchTimerInternals.java:134) 
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advanceInputWatermark(BatchTimerInternals.java:110) 
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:91) 
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457) 
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084) 
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) 
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) 
+1

私は問題があなたのパイプラインのどこかにあると信じています。スタックトレースは、ウィンドウの終わりに許容された遅延とDataflowで許容される最大タイムスタンプを超えるようなウィンドウを持つことを意味します。あなたの要素にタイムスタンプを置き、それらをウィンドウに入れるパイプラインの部分を共有しようと思いますか? –

答えて

3

あなたがバグを見つけました!

これはBEAM-341として提出されており、この修正はレビューの直後にDataflow Java SDKに移植される#464としてレビューされています。

ウィンドウ処理、トリガリング、および許容遅延を設定するコードは表示されませんが、これがどのように影響しているかはわかりません。しかし、非グローバルウィンドウと非常に大きな許容待ち時間があると、ウィンドウが "終了時間"までに期限切れにならないようにするには、簡単な回避策があります。この場合、事実上無限大の代わりに(数百年のように)非常に大きい許容待ち時間でジョブを更新することができます。

関連する問題