2016-03-23 19 views
1

データフローを使用してGoGroupByKeyを実行しようとするとこのエラーが発生します。高レベルでは、私はタイプKV<String, self-defined-class>とタイプKV<String, TableRow>の2つのPCollectionに参加したいと思います。私はほとんどの場合official documentGroupByKey.GroupByKeyOnlyのトランスレータが登録されていません

PCollection<KV<String, TableRow>> pt1 = ...; 
    PCollection<KV<String, MyClass>> pt2 = ...; 
    final TupleTag<TableRow> t1 = new TupleTag<>(); 
    final TupleTag<MyClass> t2 = new TupleTag<>(); 
    PCollection<KV<String, CoGbkResult>> coGbkResultCollection = 
    KeyedPCollectionTuple.of(t1, pt1) 
        .and(t2, pt2) 
        .apply(CoGroupByKey.<String>create()); 

に記載された例と非常によく似TupleTags、KeyedPCollectionとCoGroupByKeyで参加標準をやって、私は(それが何を意味するかの並べ替えの混乱していていることが分かったし、少し周りを検索データフローの「サービス」クエリをジョブに「翻訳」しても、それが技術的に何を意味するのかはまだ分かりません)、潜在的に何が起こっているのか(特にGroupByKeyOnlyが発生している場合)私のコード片をデバッグするためのヒントを取るだけです。

次のように全体のスタックトレースは次のとおりです。

Exception in thread "main" java.lang.IllegalStateException: no translator registered for GroupByKey.GroupByKeyOnly 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.visitTransform(DataflowPipelineTranslator.java:500) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:455) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:146) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.run(DataflowPipelineRunner.java:325) 
at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:95) 

はFYI私は、ソースコードの周りつついたBlockingDataflowPipelineRunner

EDIT、とJavaライブラリを使用しています私はDataflowPipelineTranslator.javaがTranformerを登録していないので、それがあることが判明GroupByKeyOnlyがDataflowPipelineRunnerであるため、DataflowPipelineOptions(およびその拡張のいずれか)で実行されているパイプラインにGroupByKeyOnlyが登録されます...?

+0

使用しているDataflow SDKのバージョンはどれですか? –

+0

'com.google.cloud.dataflow:google-cloud-dataflow-java-sdk-all:1.4.0' –

+0

新しい1.5.0 SDKをお試しいただけますか? –

答えて

1

GroupByKeyOnlyは、DataflowPipelineRunnerのグラフに適用されたトランスフォームセットには一度も現れてはいけません。これは、パイプラインがPipelineOptionsに設定されていない状態でパイプラインが構築され、[Blocking] DataflowPipelineRunner.run (パイプライン)。期待されるパターンとは、例えば、直接DataflowPipeline/DataflowPipelineRunnerメソッドを使用しないことです。

PipelineOptions options = PipelineOptionsFactory.fromArgs(args); 

// Make sure that runner is set before calling Pipeline.create(options) 
Pipeline p = Pipeline.create(options); 

// Apply all your transforms 
p.apply(... transforms ...); 

PipelineResult result = p.run(); 

上記の例では、あなたのアプリケーションにコマンドラインパラメータを調整することで、ランナーを交換することができるようになります。たとえば、BlockingDataflowPipelineRunnerを使用すると、ジョブ結果がp.run()から戻る前に終端状態になっていることが保証されます。

関連する問題