2017-11-30 2 views
1

.CoGroupByKeyに入力するために必要なpCollectionsにpCollectionsを変換する方法についてのマニュアルがないが(ありません)テーブル行のPCollectionをPythonのkey、value PCollectionsに変換するにはどうすればよいですか?

コンテキスト 基本的に私は2つの大きなpCollectionsを持っていると私はタイプIIのために、両者の違いを見つけることができるようにする必要がありますETLが変更された場合(pColl1に存在しない場合はpColl2のネストされたフィールドに追加)、BigQueryからこれらのレコードの履歴を保持できるようになります。

パイプラインアーキテクチャ:dwskuと製品:

  1. 読むBQ表2 pCollectionsに。
  2. 返される2つのセットにCoGroupByKey()を適用します。 - >結果
  3. dwsku内のすべての変更を見つけてネストして製品に入れます。

何か助けが必要です。私は達成するのに必要なものと同じことを行うので、Python SDKには何もありません。

Convert from PCollection<TableRow> to PCollection<KV<K,V>>

Apacheのビーム、特にPythonのSDKのドキュメント/サポートはありますか?

答えて

1

CoGroupByKey()作業を取得するためには、あなたが最初の要素はキー及び第二のだろうここでtuplesPCollectionsを、持っている必要があります - データ

BigQuerySourceがあります。現在のApache Beamの出力では、PCollection of dictionariescode)という出力があり、すべてのエントリが読み込まれたテーブルの行を表しています。上記のように、このPCollectionをタプルにマップする必要があります。これはParDoを使用して行うのは簡単です:

class MapBigQueryRow(beam.DoFn): 
    def process(self, element, key_column): 
     key = element.get(key_column) 
     yield key, element 


data1 = (p 
      | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1")) 
      | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1")) 

data2 = (p 
      | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2")) 
      | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2")) 

co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey()) 

# do your processing with co_grouped here 

ところで、Apacheのビーム用のPython SDKのドキュメントはhereを見つけることができます。

関連する問題