2017-12-27 13 views
0

私は既存のパイプラインをデータフロー2.xに移行しています。パイプラインの最終段階では、データはGoogle Cloud Servicesに書き込まれます。データは.gzに圧縮する必要があります。以前は(データフロー1.xの実装では)私たちはこれを行うために独自のシンクを作成していました。データフロー2.xでは、これを行うための組み込みの方法があります。正しいコードでなければならないものがありますが、javaコンパイラは誤った型を返すTextIO.write()について不平を言っています。コードは以下の通りです:Dataflow 2.xは、PCollectionTuple.apply()の呼び出し時に間違ったパラメータタイプについて不平を言います

PCollectionTuple results = /* some transforms */ 

// write main result 
results.get(mainOutputTag). 
apply("WriteProfile", TextIO.write().to(outputBucket) 
.withSuffix(".json")   
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP) 
.withNumShards(numChunks)); 

Javaのコンパイラは、このエラーで、不平を言っている:誰もが、問題は、上記の私のコードであるかもしれないものを

The method apply(String, PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (String, TextIO.Write) 

見ていますか?より多くの文脈が必要な場合は教えてください。

答えて

0

私は問題を解決しました。問題は、PCollection<TableRow>をファイルに書き込もうとしていたことです。ファイルにはPCollection<String>しか書き込めません。

これは私の最終的な解決策だった:

PCollectionTuple results = /* some transforms */ 

// write main result 
results.get(mainOutputTag) /* PCollection<TableRow> */ 

    .apply(ParDo.of(new DoFn<TableRow, String>() { 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      c.output(c.element().toString()); 
     } 
    })) /* PCollection<String> */ 

    .apply("WriteProfile", TextIO.write().to(outputBucket) 
    .withSuffix(".json")   
    .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP) 
    .withNumShards(numChunks)); 
関連する問題