2017-06-23 8 views
0

私のパイプラインの一部をパイプライン作成プロセスの動的入力に依存させたいと思っています。私の質問は、そうするための推奨方法は何ですか?Googleのデータフローで1.9.0のDoFnをJavaで使用する

私は(縮小)次のコードしている場合:

public static void createPipeline(){ 
    Pipeline p = TestPipeline.create(); 
    p.apply(new Source()).apply(new DoFunction()); 
    p.apply(new AnotherSource()).apply(new DoFunction()); 
    p.run; 
} 

を今DoFunctionは、パラメータでなければなりません。 これを一度インスタンス化して関数createPipelineに渡すか、クラスパラメータを使用してインスタンス化する必要がありますか?インスタンス化機能付き バージョン:クラスパラメータと

public static void createPipeline(DoFn dofn){ 
    Pipeline p = TestPipeline.create(); 
    p.apply(new Source()).apply(dofn); 
    p.apply(new AnotherSource()).apply(dofn); 
    p.run; 
} 

バージョン:Classを渡す必要はありません

public static void createPipeline(Class<?> fnClass){ 
    Pipeline p = TestPipeline.create(); 
    p.apply(new Source()).apply(fnClass.newInstance()); 
    p.apply(new AnotherSource()).apply(fnClass.newInstance()); 
    p.run; 
} 

答えて

0

- あなただけのDoFnを渡すことができます。

public static void createPipeline(DoFn<Foo, Baz> dofn) { 
    Pipeline pipeline = TestPipeline.create(); 

    pipeline 
     .apply(Read.from(new Source())) 
     .apply(ParDo.of(dofn)); 

    pipeline 
     .apply(Read.from(new AnotherSource())) 
     .apply(ParDo.of(dofn)); 

    pipeline.run(); 
} 

あなたも、完全にインスタンスParDo.of(doFn)複数回を変換し、それを適用を渡すことができます。

から返されたPCollectionを実際に使用しない場合は、入力をまとめて平準化することもできます。

関連する問題