Google BigQueryにプッシュしようとしているログがあります。私はGoogleのデータフローを使用してパイプライン全体を構築しようとしています。ログ構造は異なり、4つの異なるタイプに分類できます。私のパイプラインでは、PubSubからログを読み込み、それを解析してBigQueryテーブルに書き込みます。ログが書き込まれる必要があるテーブルは、ログの1つのパラメータに依存します。問題は、実行時にBigQueryIO.WriteのTableNameを変更する方法のポイントに固執しています。Googleのデータフロー入力に基づいて複数のテーブルに書き込む
答えて
サイド出力を使用できます。
https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn
以下のサンプルコードは、BigQueryのテーブルを読み出して、3つの異なるPCollectionsでそれを分割します。それぞれのPCollectionsは別のPub/Subトピック(代わりに別のBigQueryテーブル)に送信されてしまいます。
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};
PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
.withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
.of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
if (c.element().getF().get(2).getV().equals("2010")) {
c.output(c.element().toString());
} else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
c.sideOutput(readings2000plus, c.element().toString());
} else {
c.sideOutput(readingsOld, c.element().toString());
}
}
}));
collectionTuple.get(readings2010)
.apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
.apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
.apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));
p.run();
サイド出力の場合でも、パイプラインが実行される前に、つまりコンパイル時にパイプラインを定義するときに、シンク名(例:BigQueryIO)を定義する必要はありませんか?私の理解では、OPはBigQueryテーブルに動的に書き込むことを望んでいます。その名前は実行時にしかわかりません。私はこれがどのように可能であるかはわかりません。たぶん私は何かが分かりにくいです!オハイオ州、右。 –
「ログが書き込まれる必要のあるテーブルがログの1つのパラメータに依存している」ということは、その名前が動的である必要があるか、または既知のセットの1つである必要があるかのいずれかです。うまくいけば、@nikhil sharmaはコメントできます。 –
「ログ構造が違っていて、4つの異なるタイプに分類することができる」ため、これはセットだと思います。 - これにより、ログを4つの異なるテーブルに分けることができます。 –
- 1. 番号に基づいて複数の配列に書き込む方法は?
- 2. REDUCERから複数のテーブルに出力を書き込む
- 3. 1つの入力に複数の値を書き込む
- 4. 複数の入力をテキストファイルに書き込む?
- 5. 複数行のユーザー入力をファイルに書き込む
- 6. プロジェクトの場所に基づいてファイルに書き込む
- 7. EditTextの内容に基づいてファイルに書き込む
- 8. WMI情報に基づいてレジストリに書き込むPowershellスクリプト
- 9. 1つの入力値に基づいて、入力テーブル
- 10. Pythonの変数に基づいてディレクトリにファイルを書き込む
- 11. 複数のテーブルに基づいてクエリを書く方法は?
- 12. SSIS Foreachループを使用して変数に基づいて特定のテーブルに書き込む方法
- 13. CSVセルPythonに基づいてXMLファイル名を書き込む
- 14. FileFormatに基づいてsparkカスタムデータソースを書き込む方法
- 15. Slick 3 - 他のテーブルに基づいて複数のテーブルにトランザクション的に挿入
- 16. Javaスクリプト複数のリダイレクト入力に基づいて
- 17. 複数のファイルを読み込んで複数のテーブルに書き込むバッチジョブ
- 18. DFSORTを使用して、条件に基づいて複数の行を1つに書き込む
- 19. 複数のコレクションのデータに基づいて更新コマンドを書き込むことはできますか?
- 20. Openxlsxパッケージ - 1つのシートに複数のテーブルを書き込む?
- 21. テキスト入力をExcelに書き込む
- 22. 複数のGoogle Cloud Storageバケットにデータを書き込む
- 23. SparkでDataFrameの複数の値に基づいて条件を書き込む方法
- 24. 複数のストリームに書き込むラッパー
- 25. 複数のファイルに書き込む
- 26. 複数のcsvファイルに書き込む
- 27. Pythonでの位置に基づいてファイルにデータを書き込む方法
- 28. 同じフォームを使用して複数のテーブルに複数の挿入を書き込む方法PHP
- 29. 不可能 - 複数行の出力をファイルに書き込む
- 30. 複数のフルHTML 'ファイル'をシングル出力ストリームに書き込む?
は、次のようになります。http://stackoverflow.com/questions/30431840/writing-results-of-google-dataflow-pipeline-into-mulitple-sinks、それはあなたを助けるかもしれない、それをチェックしてみてください。 – robosoul
@nikhil sharma - このソリューションを自分でハンドリングする必要がありますか?代わりにFluentdのようなものを使ってみましたか? http://www.fluentd.org/ –
@Grahamは答えに感謝します。 http://stackoverflow.com/questions/35979421/dynamic-table-name-when-writing-to-bq-from-dataflow-pipelinesとまったく同じように必要ですが、データフローではこれがサポートされていないことにご迷惑をおかけして申し訳ありません。私はFluentdを試してみませんでしたが、私たちはデータフローやその他のものにパイプラインを実装しようとしています。 Googleのパイプラインすべてで非常に必要なので、Googleにこの機能を含める計画があるのかどうか教えてください。また、私がこの機能を実装することによって貢献できるなら、私はそれをもっとうれしく思います。できれば教えてください。 –