2016-12-14 13 views
2

私はシンプルなDataFlow javaジョブを使用して、.csvファイルから数行を読み込みます。各行には数値セルが含まれています。このセルは、その行で特定の関数を実行する必要があるステップの数を表します。Google DataFlowパイプラインで並列Forループを作成する正しい方法

私は、関数内で従来のForループを使用して、その数値が非常に大きくなる場合には実行したくありません。並列フレンドリーなDataFlowメソドロジを使用してこれを行う正しい方法は何ですか? (その各々の数は、私はそのライン上で並列機能を実行する回数を表す)

public class SimpleJob{ 

    static class MyDoFn extends DoFn<String, Integer> { 

     public void processElement(ProcessContext c) { 
      String name = c.element().split("\\,")[0]; 
      int val = Integer.valueOf(c.element().split("\\,")[1]); 
      for (int i = 0; i < val; i++) // <- what's the preferred way to do this in DF? 
       System.out.println("Processing some function: " + name); // <- do something 
      c.output(val); 
     } 

    } 

    public static void main() { 

     DataflowPipelineOptions options = PipelineOptionsFactory 
       .as(DataflowPipelineOptions.class); 
     options.setProject(DEF.ID_PROJ); 
     options.setStagingLocation(DEF.ID_STG_LOC); 
     options.setRunner(DirectPipelineRunner.class); 

     Pipeline pipeline = Pipeline.create(options); 

     pipeline.apply(TextIO.Read.from("Source.csv")) 
       .apply(ParDo.of(new MyDoFn())); 

     pipeline.run(); 
    } 
} 

これは「source.csv」はどのように見えるかです::

はここで、現在のJavaコードです

ジョー、3
メアリー、4
ピーター、不思議なことに十分な2

答えて

3

が、これはSplittable DoFnのための動機ユースケースの一つです!このAPIは現在、大きく開発されています。そのAPIが利用可能になるまで

はしかし、あなたは基本的にそれはあなたのために行っているだろうかのほとんどを模倣することができます。

class ElementAndRepeats { String element; int numRepeats; } 
PCollection<String> lines = p.apply(TextIO.Read....) 
PCollection<ElementAndRepeats> elementAndNumRepeats = lines.apply(
    ParDo.of(...parse number of repetitions from the line...)); 
PCollection<ElementAndRepeats> elementAndNumSubRepeats = elementAndNumRepeats 
    .apply(ParDo.of(
     ...split large numbers of repetitions into smaller numbers...)) 
    .apply(...fusion break...); 
elementAndNumSubRepeats.apply(ParDo.of(...execute the repetitions...)) 

  • は「繰り返しの多数を分割」でありますDoFnは、ElementAndRepeats{"foo", 34}{ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 10}, ElementAndRepeats{"foo", 4}}
  • 融合ブレイクに分割します。hereを参照して、複数のParDoが融合して並列化されないようにします。
+0

あなたが提案したカスタムElementsAndRepeatsではなく、KV を試してみました。うまくできた。さらに2つの質問:(1)これを打ち破るためのミニループの適切な数は何ですか?あなたは10を示していますが、ベストプラクティスはありますか?数十万、おそらく何百万というオーダーのオリジナルの数字を想像してみてください。 (2)フュージョンブレークを行う方法の例はありますか?私はあなたのリンクを読むが、それははっきりしていないようだ。私はここに関連する質問に更新された完全なコードを投稿した:[リンク](http://stackoverflow.com/questions/41091713/sharing-bigtable-connection-object-among-dataflow-dofn-sub-classes) –

+0

1 - それ大したことではありません。各ElementAndRepeatsに関連するオーバーヘッドがあるため、「1」は低すぎるでしょう。あなたが余分な並列化をしないので、1Mは高すぎるでしょう。数十から数千のオーダのものはほぼ同じ性能を発揮します。 2 - それを行う1つの方法は次のとおりです:https://github.com/apache/incubator-beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io /jdbc/JdbcIO.java#L307 – jkff

関連する問題