2016-10-04 26 views
1

私は例外を取得し続けているので( 'xxxをシリアライズできません')、サイド出力を出す必要があるジョブに苦労しています。OutputTagsでジョブを実行することができません

は、私は明示的に私が働いているタイプのコーダを指定した場合でも、私は同じエラーを取得保管ので、私は、このドキュメント以下のような簡単な仕事を書くことにした。驚いたことに

https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs

、私はまだ同じ種類の例外を取得し、今私は間違ったことをしたに違いないと思う(しかし、私はそれを実際に理解することはできない)。コードに関する限り、私は上記の例に従いました。

以下、私はソースコードを掲載していますが、実行するとエラーメッセージが表示されます。私はこれが再現可能であると信じています(あなたが所有しているバケットに 'GCS_BUCKET'を変更し、argsを使って 'TestSideOutput'を呼び出すmain()メソッドを作成します)、他の誰かが再現できるかどうかを知ることは良いでしょう。 JDK 8とDataflow SDK 1.7.0を使用しています。

上記のドキュメントの例ではDoFnを拡張した匿名クラスを使用していますが、私も試しましたが、同じ種類のエラーメッセージがあります。以下のコードはこのクラスを名前付き内部クラス( 'Filter')にリファクタリングします。

中括弧( "{}")を使わずにTupleTagsを初期化しようとしました。これは実際には警告を発生させるため、例外につながります(この記事の最後のコードスニペットを参照)。ここで

は、私が使用するコードです:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; 
import com.google.cloud.dataflow.sdk.io.TextIO; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.values.PCollectionTuple; 
import com.google.cloud.dataflow.sdk.values.TupleTag; 
import com.google.cloud.dataflow.sdk.values.TupleTagList; 
import com.moloco.dataflow.DataflowConstants; 

public class TestSideOutput { 
    private TestOptions options; 
    private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name 

    public TestSideOutput(String[] args) { 
    options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class); 
    options.setProject(DataflowConstants.PROJCET_NAME); 
    options.setStagingLocation(DataflowConstants.STAGING_BUCKET); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setJobName(options.getJob() + "-test-sideoutput"); 
    } 

    public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>() {}; 
    final TupleTag<String> sideTag = new TupleTag<String>() {}; 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = 
     profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.apply(
     TextIO.Write.named("writingMain").to(GCS_BUCKET + "example/main-output/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.apply(
     TextIO.Write.named("writingSide").to(GCS_BUCKET + "example/side-output/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

    static class Filter extends DoFn<String, String> { 
    private static final long serialVersionUID = 0; 
    final TupleTag<String> sideTag; 
    String keyword; 

    public Filter(String keyword, TupleTag<String> sideTag) { 
     this.sideTag = sideTag; 
     this.keyword = keyword; 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     String profile = c.element(); 
     if (profile.contains(keyword)) { 
     c.output(profile); 
     } else { 
     c.sideOutput(sideTag, profile); 
     } 
    } 
    } 
} 

そして、これは私が使用するコマンド、およびエラー/例外で、私は(それだけで我々はデータフローパッケージに使用するいくつかのコマンドライン引数が含まれていました、ここでは何も特別な、ちょうどあなたのアイデアを与えるために):また

dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01 
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41) 
     at com.moloco.dataflow.Main.main(Main.java:152) 
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) 
     ... 6 more 

、私はこれが関連していると思わないないが、「TestOptions」クラスのコード:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.Description; 
import com.google.cloud.dataflow.sdk.options.Validation; 

public interface TestOptions extends DataflowPipelineOptions { 
    @Description("Job") 
    @Validation.Required 
    String getJob(); 

    void setJob(String value); 

    @Description("Job suffix") 
    String getJobSuffix(); 

    void setJobSuffix(String value); 

    @Description("Date") 
    @Validation.Required 
    String getDate(); 

    void setDate(String value); 
} 

最後に、TupleTagsをインスタンス化するときに中括弧 "{}"を削除すると、次のような例外が発生します(このようなことを避けるために、{{} "でインスタンス化する必要があります問題):

Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes: 
    No Coder has been manually specified; you may do so using .setCoder(). 
    Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible. 
    Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) 
     at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
     at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50) 
     at com.moloco.dataflow.Main.main(Main.java:152) 

編集:() '静的' を実行することによって、これを解決するための以下の回答を参照してください。

以下のコードは、私が最初に投稿したものと似ていますが、2つの変更点があります。 可能な限り、私は各PCollectionに対して明示的に(冗長に) 'コーダー'を指定します。さらに、TupleTagsをインスタンス化するときに、中括弧は使用しません。

どのアプローチ(静的対冗長アプローチ)が適切であることに注意してください。あなたのFilter fnが(それは非静的関数​​からインスタンス化された非静的匿名クラスだから)順番に囲むTestSideOutput参照TupleTagを、参照しているため

public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>(); 
    final TupleTag<String> sideTag = new TupleTag<String>(); 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of()) 
     .apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain") 
     .to(GCS_BUCKET + "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide") 
     .to(GCS_BUCKET + "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

答えて

2

あなたが取得しているエラーです。

パイプラインはTestSideOutputオブジェクトをシリアル化しようとしており、メッセージで示されるようにシリアル化できません。java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput

根本的な原因は、メソッド​​が静的​​でないことです。静的にすると問題が解決するはずです。

+0

実際、私が持っていた問題を解決するために提案されたのは何ですか。ありがとうございました! 一方、サイド出力タグでParDoを適用する非静的なexecute()メソッドを持つ別のジョブがありますが、例外がスローされることはありません(それは、上記のサンプルコードを部分的に書いた理由の一部です私にとっては奇妙に思えました)。私はこの時点でそのコストを実際に投稿することはできませんが、execute()メソッドを静的にせずにこの問題を解決する別の方法があるのでしょうか? –

+0

私はちょっとフォローアップの質問に答えました(私の編集した質問の最後に追加されたコードスニペットを見てください)。可能であれば、明示的かつ冗長的にコーダーを宣言することにより、execute()を非静的に保つことが可能です。 –