2016-11-16 9 views
2

〜800.000ファイルで大きな変換を実行しようとすると、パイプラインを実行しようとすると上記のエラーメッセージが表示されます。ここでGoogle Dataflow:要求ペイロードサイズが上限を超えています:10485760バイト

コードです:

public static void main(String[] args) { 
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());  
    GcsUtil u = getUtil(p.getOptions()); 

    try{ 
     List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip")); 
     List<String> strPaths = new ArrayList<String>(); 
     for(GcsPath pa: paths){ 
      strPaths.add(pa.toUri().toString()); 
     }   

     p.apply(Create.of(strPaths)) 
     .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox"))); 
     p.run(); 
    } 
    catch(IOException io){ 
     // 
    } 

}

私はGoogleのデータフローがためであるまさにザッツと思いましたか?大量のファイル/データの処理?

ロードを分割して動作させる方法はありますか?

おかげ& BR

フィル

答えて

3

データフローは、大量のデータを扱うのが得意ですが、パイプラインの記述がいかに大きなの面で制限があります。 Create.of()に渡されたデータはパイプラインの説明に埋め込まれているため、大量のデータを渡すことはできません。大量のデータを外部ストレージから読み取る必要があり、パイプラインではその位置のみを指定する必要があります。

プログラムが処理できるデータの量とプログラムのコード自体のサイズの違いと考えることができます。

p.apply(Create.of("gs://tlogdataflow/stage/*.zip")) 
.apply(ParDo.of(new ExpandFn())) 
.apply(...fusion break (see below)...) 
.apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox"))) 

ExpandFnは、次のようにのようなものです::

あなたが拡大することによって、この問題を回避することができますがParDoで起こる

private static class ExpandFn extends DoFn<String, String> { 
    @ProcessElement 
    public void process(ProcessContext c) { 
    GcsUtil util = getUtil(c.getPipelineOptions()); 
    for (String path : util.expand(GcsPath.fromUri(c.element()))) { 
     c.output(path); 
    } 
    } 
} 

融合ブレークによってアイムthis(基本的には、ParDo(add unique key) + group by key + Flatten.iterables() + Values.create())を参照してください。非常に便利ではありませんし、これを行う組み込み変換を追加することについての議論があります(this PRthis threadを参照)。

1

ありがとうございました!あなたの入力を使って、私はこれを次のように解決しました:

public class ZipPipeline { 
private static final Logger LOG = LoggerFactory.getLogger(ZipPipeline.class); 

public static void main(String[] args) { 
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());  

    try{ 
     p.apply(Create.of("gs://tlogdataflow/stage/*.zip")) 
     .apply(ParDo.of(new ExpandFN())) 
     .apply(ParDo.of(new AddKeyFN())) 
     .apply(GroupByKey.<String,String>create()) 
     .apply(ParDo.of(new FlattenFN())) 
     .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox"))); 
     p.run(); 

    } 
    catch(Exception e){ 
     LOG.error(e.getMessage()); 
    } 

} 

private static class FlattenFN extends DoFn<KV<String,Iterable<String>>, String>{ 
    private static final long serialVersionUID = 1L; 
    @Override 
    public void processElement(ProcessContext c){ 
     KV<String,Iterable<String>> kv = c.element(); 
     for(String s: kv.getValue()){ 
      c.output(s); 
     } 


     } 

    } 

private static class ExpandFN extends DoFn<String,String>{ 
private static final long serialVersionUID = 1L; 

@Override 
    public void processElement(ProcessContext c) throws Exception{ 
     GcsUtil u = getUtil(c.getPipelineOptions()); 
     for(GcsPath path : u.expand(GcsPath.fromUri(c.element()))){ 
      c.output(path.toUri().toString()); 
     } 
    } 
} 

private static class AddKeyFN extends DoFn<String, KV<String,String>>{ 
    private static final long serialVersionUID = 1L; 
    @Override 
    public void processElement(ProcessContext c){ 
    String path = c.element(); 
    String monthKey = path.split("_")[4].substring(0, 6); 
    c.output(KV.of(monthKey, path)); 
    } 
} 
関連する問題