0

私はPython(2.7)を使用していて、GoogleのDataFlow環境内で作業していますが、Googleがまだすべてを完全にフラッシュしていないことは言うまでもありません。ただし、DataflowからBigQueryへの書き込み部分は、BigQuery Sinkに記載されています。Python BigQuery Dataflowシンクが、レコードをデータベースに挿入しないのはなぜですか?

文書によると、スキーマを指定するために、あなたが入力した文字列が必要です。「example_project_id::example_dataset_id.example_table_name」

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING' 

テーブル名、プロジェクトIDとデータセットIDは、このようなものです

今、そのすべてが機能しています。以下のコードを見てくださいが、私は見ることができて、テーブルとフィールドを作成しています。注:プロジェクトIDは、関数の引数の一部として設定されています。私は物事がこれを使用して挿入得ることができるように

bq_data | beam.io.Write(
    "Write to BQ", beam.io.BigQuerySink(
     'example_dataset_id.{}'.format(bq_table_name), 
     schema=schema, 
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
    ) 
) 

今、それが見えます:

bq_data = pipeline | beam.Create(
    [{ 
     'field_1': 'ExampleIdentifier', 
     'field_2': 'ExampleValue', 
     'field_3': 'ExampleFieldValue', 
     'created_at': '2016-12-26T05:50:39Z', 
     'updated_at': '2016-12-26T05:50:39Z', 
     'field_4': 'ExampleDataIdentifier', 
     'field_5: 'ExampleData' 
    }] 
) 

しかしPCollectionに値を梱包する際、何らかの理由で、それはそれはBigQueryのに挿入するが、ときと言う

テーブルを照会すると、何も表示されません。

なぜ挿入されないのですか?エラーは表示されませんが、BigQueryには何も挿入されません。

これは、それがPCollectionに含まれているように、データが見えるもので、私が挿入して1100行の近くにあります。

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'} 

注:私は、日付の書式にチェックインし、上記の書式日付が許可されていますBigQueryの挿入用。

+0

私たちはこれを検討しています。 DirectPipelineRunnerを使用していますか(これがデフォルトです)。また、パイプラインの詳細やBigQueryでデータが利用できないことを確認する方法についても説明できます。 pipeline.run()を最後に呼び出してパイプラインを実行してください。 – chamikara

+0

ここには、サンプル情報を入れたデータフローを実行するコマンドのコピーがあります。python -m dataflow_sample --runner DirectPipelineRunner --setup_file ./setup.py --job_name sample-dataflow-run-1 --server dev- -worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3 – Jravict

+0

データの検証方法はBigQueryではありませんが、情報が必要な非常にテーブルに対してクエリを実行しています。もちろん、dataflow run()関数の最後にpipeline.run()を実行しています。 – Jravict

答えて

0

私はあなたの正確なスキーマと入力で例を試してみました。私は修正をしなければならなかった。

(1)引数にプロジェクトを指定していないようです。パイプライン定義内でこれを指定している可能性があります。これはエラーが表示されていないからです。 (2)上記のコードに誤字があります。 'field_5: 'ExampleData''field_5': 'ExampleData' にする必要がありますが、私はこれがあなたの元のパイプラインではなく、この問題の誤植であると仮定しています。

Dataflowの最新バージョンを実行していますか?新しい仮想環境を作成し、 'pip install google-cloud-dataflow'を実行して最新バージョンをインストールすることができます。

私は試してみるためにあなたの完全なピピラインを共有することは可能ですか?

「DirectPipelineRunner」を使用しているため、これをリモートでデバッグすることは困難です。 'DataflowPipelineRunner'を使用して同じパイプラインを実行することは可能ですか(このために請求が有効になっているGCPプロジェクトが必要です)。 'DataflowPipelineRunner'を使用してこれを実行し、ジョブIDを指定できる場合、ログを表示できます。

+0

プロジェクトは、引数である:--server、必要= Falseの--regionの--output、必要= Falseの** - **プロジェクト、必要= False - バケット、必須= False - 必須、必須= False - ステージングの場所、必須= False - temp_location、必須= False私は何かを持っていると仮定しています。--runner、required = False --setup_file、required = False --devices、nargs = "*"、required = False --start_date、必須= True --end_date、required = True – Jravict

+0

私は物事を遊んでいたのでGoogleの側で修正した。あなたの返答を見た後、私は自分が持っていたものに戻り、それは完全に働いた。私は問題が何だったのか分からないが、今挿入しているようだ。 – Jravict

関連する問題