0

ここにあるのはです:私はGCS内に圧縮されており、.gzファイル拡張子(つまり、000000_ [0-5] .gz)をインポートしようとしています単一のBQテーブル私は現在までコマンドラインからコマンドを実行していましたが、Dataflowでこれを達成したいと思っていました。将来的にいくつかの変換が追加される可能性があります。データフローGCSからBQへの問題

圧縮されたGCSファイルのデータは、スキーマが頻繁に変更される複雑なJSON構造であるため、BQ内でJSF_EXTRACT関数を使用するのは、ファイル全体を1つの列のTSVとしてrecordとするのが最も簡単です。必要な時に必要な値を解析します。

問題:このシナリオでは最小限のデータフローパイプラインを作成しました。 GCSから読み込んでBigQueryテーブルに書き込みます。私は、このパイプラインを実行すると、しかし、私はここに示され、JSONのパースエラーを取得しています:以下

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object. 

いくつかの変数を持つ私のデータフロースクリプトが匿名化されます。

from __future__ import absolute_import 

import argparse 
import logging 
import re 
import json 

import apache_beam as beam 
from apache_beam.io import ReadFromText 
from apache_beam.io import WriteToText 
from apache_beam.io import Read 
from apache_beam.io import WriteToText 
from apache_beam.io import WriteToBigQuery 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import SetupOptions 

def run(argv=None): 

    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         dest='input', 
         default='gs://BUCKET_NAME/input-data/000000_0.gz', 
         help='Input file to process.') 
    known_args, pipeline_args = parser.parse_known_args(argv) 
    pipeline_args.extend([ 
     '--runner=DataflowRunner', 
     '--project=GCP_PROJECT_NAME', 
     '--staging_location=gs://BUCKET_NAME/dataflow-staging', 
     '--temp_location=gs://BUCKET_NAME/dataflow-temp', 
     '--job_name=gcs-gzcomp-to-bq1', 
    ]) 

    pipeline_options = PipelineOptions(pipeline_args) 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    with beam.Pipeline(options=pipeline_options) as p: 

    (p | "ReadFromGCS" >> ReadFromText(known_args.input) 
     | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME', 
      project='GCP_PROJECT_NAME', schema='record:string')) 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

あなたが見ることができるように、私は、文字列型で1列のみを含むスキーマを指定することによって、私は伝統的なロード・ジョブでやっているのと同じことを行うために試みたが、それはまだ失敗しています。

GCSファイルのインポート方法について、Dataflowに詳細を明示的に伝える方法はありますか?つまり、各行で有効なJSONオブジェクトであるにもかかわらずTSVを指定していますか?

また、このエラーが他の何かに関連している場合は、私もそれを電話してください。私はDataflowの新機能が非常に新しくなっていますが、BQ &の他のGCPツールもかなり経験していますので、これを私のツールベルトに追加したいと考えています。

答えて

0

WriteToBigQueryへの入力コレクションは、文字列のコレクションではなく、辞書のコレクション(各キーはBigQuery列にマップされる)である必要があります。 | beam.Map(lambda line: dict(record=line))のようなものを通過してみてください。

+0

ワウ;ありがとう、トン。とてもシンプルですが、完璧に働きました。 – andre622

関連する問題