私はPython SDKでGoogle Cloud Dataflowを使用しています。Google DataflowのPCollectionから要素のリストを取得し、それをパイプラインで使用してWrite Transformをループする方法はありますか?
私はしたいと思います:
- は、ループフィルタPCollections(ユニーク日付でそれぞれ)を作成し、書くことがそのリスト内の日付によって
- マスターPCollectionのうち、ユニークな日付のリストを取得します。各フィルタリングされたPCollectionは、BigQueryの時分割テーブルでパーティションに分割されます。
このリストはどのように入手できますか?次の結合変換の後、ListPCollectionViewオブジェクトを作成しましたが、そのオブジェクトを反復できません:
class ToUniqueList(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, element):
if element not in accumulator:
accumulator.append(element)
return accumulator
def merge_accumulators(self, accumulators):
return list(set(accumulators))
def extract_output(self, accumulator):
return accumulator
def get_list_of_dates(pcoll):
return (pcoll
| 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
私はすべて間違っていますか?それをする最善の方法は何ですか?
ありがとうございました。