BigQueryテーブルに結果を書き込むApache Beam/Dataflowパイプラインがあります。私はパイプラインの別の部分のためにこのテーブルを照会したいと思います。しかし、私はこのパイプラインの依存関係を適切に設定する方法を理解できないようです。私が書いた(そして次に照会したい)新しいテーブルは、いくつかのフィルタリングロジックのために別のテーブルに結合されたままになっているので、実際にテーブルを書いてからクエリを実行する必要があります。テーブル作成後のApache Beamパイプラインクエリテーブル
with beam.Pipeline(options=pipeline_options) as p:
table_data = p | 'CreatTable' >> # ... logic to generate table ...
# Write Table to BQ
table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...)
query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))
query_new_table
が実際に既存のBQテーブルのクエリであると私はquery_results = p |
代わりのtable_written
に変更した場合、これは正常に動作します:次のようにロジックは次のようになります。しかし、私がパイプラインの途中で書いているテーブルをクエリしようとすると、そのテーブルが実際に生成されるまでパイプラインステップを「待機」することはできません。これを行う方法はありますか?私は見落としていますか?
この手順を順次実行しようとすると、有効なPCollectionインスタンスではないため、table_written
が問題であることを意味するアサーションエラーassert isinstance(pbegin, pvalue.PBegin) AssertionError
が表示されています。
誰かが私がtable_writtenの代わりに置いて、実際にこれを必要に応じて順番に実行できるようにすることができますか?
非常に役に立ちます!私はこれを正しく理解していますが、ベストプラクティスは現在、1) 'beam.Pipeline(options = pipeline_options)as p1:'と書いてBQに書いた 'run()'と、 ) 'beam.Pipeline(options = pipeline_options)を' p2: 'とすると、BQから読み込むか、メインプログラム内で実際に2つの' run() '関数を持つように提案していますか? – reese0106
ビームに関する限り、それは重要ではありません:あなたのPythonスタイルの感覚によれば、あなたにはもっと読みやすいように見えますか? – jkff