2016-11-23 4 views
2

excessive fusionを防ぐためにReshuffle変換を実装しようとしていますが、単純なPCollectionsを処理するために<KV<String,String>>のバージョンを変更する方法はわかりません。 (hereに記載されている方法PCollection <KV<String,String>>シャッフルする。)PCollectionを再シャッフルする方法<T>?

をどのように私は私のパイプラインで複数の工程を追加する前にexample codeを改造する公式のアブロI/Oを拡張するのでしょうか? Googleのサポートチームが提供するコードスニペットに

PipelineOptions options = PipelineOptionsFactory.create(); 
Pipeline p = Pipeline.create(options); 

Schema schema = new Schema.Parser().parse(new File("schema.avsc")); 

PCollection<GenericRecord> records = 
    p.apply(AvroIO.Read.named("ReadFromAvro") 
     .from("gs://my_bucket/path/records-*.avro") 
     .withSchema(schema)); 

答えて

3

おかげで、私はそれを考え出した:再分割クラスを使用

PCollection<T> reshuffled = data.apply(Repartition.of()); 

import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.GroupByKey; 
import com.google.cloud.dataflow.sdk.transforms.PTransform; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.KV; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import java.util.concurrent.ThreadLocalRandom; 

public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> { 

    private Repartition() {} 

    public static <T> Repartition<T> of() { 
     return new Repartition<T>(); 
    } 

    @Override 
    public PCollection<T> apply(PCollection<T> input) { 
     return input 
       .apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>())) 
       .apply(GroupByKey.<Integer, T>create()) 
       .apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>())); 
    } 

    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); 
     } 
    } 

    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      for (T s : c.element().getValue()) { 
       c.output(s); 
      } 
     } 
    } 
} 

が入れ替えPCollectionを取得するには

+0

あなたは 'AddArbitaryKey'について詳しく説明できますか?なぜそれが必要なのか、そして 'AddArbitraryKey'の特定の実装が重要なのですか?それはキースペースがワーカーに分散される方法に影響を与えますか? – harveyxia

+0

'Redistribution'トランスフォーム(https://github.com/apache/incubator-beam/pull/1036参照)と同様に、任意の方法で再配布する必要があります。無作為に選ばれた整数キーはランダムな分布につながるはずです。 – Tobi

+0

ありがとう、 'Redistribution'のユースケースは? – harveyxia

関連する問題