私はPython(2.7)を使用していて、GoogleのDataFlow環境内で作業していますが、Googleがまだすべてを完全にフラッシュしていないことは言うまでもありません。ただし、DataflowからBigQueryへの書き込み部分は、BigQuery Sinkに記載されています。Python BigQuery Dataflowシンクが、レコードをデータベースに挿入しないのはなぜですか?
文書によると、スキーマを指定するために、あなたが入力した文字列が必要です。「example_project_id::example_dataset_id.example_table_name」
schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'
テーブル名、プロジェクトIDとデータセットIDは、このようなものです
今、そのすべてが機能しています。以下のコードを見てくださいが、私は見ることができて、テーブルとフィールドを作成しています。注:プロジェクトIDは、関数の引数の一部として設定されています。私は物事がこれを使用して挿入得ることができるように
bq_data | beam.io.Write(
"Write to BQ", beam.io.BigQuerySink(
'example_dataset_id.{}'.format(bq_table_name),
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
今、それが見えます:
bq_data = pipeline | beam.Create(
[{
'field_1': 'ExampleIdentifier',
'field_2': 'ExampleValue',
'field_3': 'ExampleFieldValue',
'created_at': '2016-12-26T05:50:39Z',
'updated_at': '2016-12-26T05:50:39Z',
'field_4': 'ExampleDataIdentifier',
'field_5: 'ExampleData'
}]
)
しかしPCollectionに値を梱包する際、何らかの理由で、それはそれはBigQueryのに挿入するが、ときと言う
テーブルを照会すると、何も表示されません。なぜ挿入されないのですか?エラーは表示されませんが、BigQueryには何も挿入されません。
これは、それがPCollectionに含まれているように、データが見えるもので、私が挿入して1100行の近くにあります。
{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}
注:私は、日付の書式にチェックインし、上記の書式日付が許可されていますBigQueryの挿入用。
私たちはこれを検討しています。 DirectPipelineRunnerを使用していますか(これがデフォルトです)。また、パイプラインの詳細やBigQueryでデータが利用できないことを確認する方法についても説明できます。 pipeline.run()を最後に呼び出してパイプラインを実行してください。 – chamikara
ここには、サンプル情報を入れたデータフローを実行するコマンドのコピーがあります。python -m dataflow_sample --runner DirectPipelineRunner --setup_file ./setup.py --job_name sample-dataflow-run-1 --server dev- -worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3 – Jravict
データの検証方法はBigQueryではありませんが、情報が必要な非常にテーブルに対してクエリを実行しています。もちろん、dataflow run()関数の最後にpipeline.run()を実行しています。 – Jravict