2017-10-28 9 views
0

BigQueryテーブルに結果を書き込むApache Beam/Dataflowパイプラインがあります。私はパイプラインの別の部分のためにこのテーブルを照会したいと思います。しかし、私はこのパイプラインの依存関係を適切に設定する方法を理解できないようです。私が書いた(そして次に照会したい)新しいテーブルは、いくつかのフィルタリングロジックのために別のテーブルに結合されたままになっているので、実際にテーブルを書いてからクエリを実行する必要があります。テーブル作成後のApache Beamパイプラインクエリテーブル

with beam.Pipeline(options=pipeline_options) as p: 
    table_data = p | 'CreatTable' >> # ... logic to generate table ... 

    # Write Table to BQ 
    table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...) 

    query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table)) 

query_new_tableが実際に既存のBQテーブルのクエリであると私はquery_results = p |代わりのtable_writtenに変更した場合、これは正常に動作します:次のようにロジックは次のようになります。しかし、私がパイプラインの途中で書いているテーブルをクエリしようとすると、そのテーブルが実際に生成されるまでパイプラインステップを「待機」することはできません。これを行う方法はありますか?私は見落としていますか?

この手順を順次実行しようとすると、有効なPCollectionインスタンスではないため、table_writtenが問題であることを意味するアサーションエラーassert isinstance(pbegin, pvalue.PBegin) AssertionErrorが表示されています。

誰かが私がtable_writtenの代わりに置いて、実際にこれを必要に応じて順番に実行できるようにすることができますか?

答えて

2

「BigQuery書き込みが完了した後に何かする」というユースケースは現在Beamではサポートされていません。唯一の回避策は、別々のパイプラインを実行することです:メインプログラムを以下のようにしてください:BigQueryに書き込むパイプラインを実行してください。パイプラインが終了するのを待ちます。 BigQueryから読み込む別のパイプラインを実行します。

これは非常に頻繁に要求される機能であり、私たちはこのサポートを設計し始めています(より一般的には、その後のシーケンシング操作をサポートするためにさまざまなIO書き込みをアップグレードします)。

+0

非常に役に立ちます!私はこれを正しく理解していますが、ベストプラクティスは現在、1) 'beam.Pipeline(options = pipeline_options)as p1:'と書いてBQに書いた 'run()'と、 ) 'beam.Pipeline(options = pipeline_options)を' p2: 'とすると、BQから読み込むか、メインプログラム内で実際に2つの' run() '関数を持つように提案していますか? – reese0106

+0

ビームに関する限り、それは重要ではありません:あなたのPythonスタイルの感覚によれば、あなたにはもっと読みやすいように見えますか? – jkff

関連する問題