Google Cloud Dataflowを使用していて、PCollectionのすべての要素にアクセスする必要があるParDo関数があります。これを達成するために、PCollectionをすべての要素の1つのIterableを含むPCollectionに変換したいと思いました。私は、クリーナー/シンプル/より速い解決策があるかどうか疑問に思っていました。PCollectionにPCollection <T>を組み合わせる簡単なアプローチ<Iterable<T>>
最初の方法は、ダミーのキーを作成し、GroupByKeyを実行し、その後に値を取得することでした。
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
第2のアプローチは、ここに勧告続く:しかし、分別のないHow do I make View's asList() sortable in Google Dataflow SDK?を。私はView.asList()を作成し、ダミーのPCollectionを作成し、サイド入力としてのビューを持つダミーPCollectionにParDo関数を適用し、単純にビューを返しました。
PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
このタスクにはあらかじめ定義された結合機能があるようですが、見つけられません。助けてくれてありがとう!