2017-05-11 5 views
1

データフローのパイプラインを構築する際に、一見単純な問題があります。私は外部ソースからデータをフェッチし、データを変換していくつかのBigQueryテーブルに書き込む複数のパイプラインを持っています。このプロセスが完了したら、ちょうど生成されたテーブルをクエリするクエリを実行したいと思います。理想的には、私はこれが同じ仕事で起こることを望みます。1つのデータフロージョブでBigQueryを書き込んだり読み込んだりする

これはデータフローの使用方法ですか、BigQueryへの読み込みとテーブルのクエリはジョブ間で分割する必要がありますか?

同じ仕事でこれが可能ならば、BigQuerySinkはPCollectionを生成しないので、どのようにこれを解決できますか?これが同じジョブでは不可能な場合は、別のジョブ(つまり、書き込みジョブと照会ジョブ)の完了時にジョブをトリガーする方法はありますか?

答えて

5

BigQuerySinkは、PCollectionを生成する必要があります。これは、単一のジョブでこれを行うために何が起こる必要があるかを暗示しています。たとえそれが空であっても、最初のシンクが完了するまでそのステップを待たせる方法でBigQueryから読み取るステップへの入力として使用することができます。

これを行うには、独自のバージョンのBigQuerySinkを作成する必要があります。

可能であれば、BigQueryに書き込んだテーブルを読み込むのではなく、BigQueryに書き込んだコレクションから2番目のステップを読み込むほうが簡単です。例:

PCollection<TableRow> rows = ...; 
rows.apply(BigQuery.Write.to(...)); 
rows.apply(/* rest of the pipeline */); 

テーブル行ではなくBigQueryに書き込まれた要素の処理を続行する場合は、これを先に行うこともできます。

+0

ありがとうございます! BiQuerySinkのインプリメンテーションを作成することは、2番目の答えが私の特定のケースに本当に適合しないため、私がやることです。 – selectle

+0

こんにちは@Supahsmooth、答えがあなたの問題を解決するのに役立つなら、それを投票することも検討してください:https://stackoverflow.com/help/why-vote。それはあなたを助ける人のために非常に便利です:) –

関連する問題