ここにあるのはです:私はGCS内に圧縮されており、.gzファイル拡張子(つまり、000000_ [0-5] .gz)をインポートしようとしています単一のBQテーブル私は現在までコマンドラインからコマンドを実行していましたが、Dataflowでこれを達成したいと思っていました。将来的にいくつかの変換が追加される可能性があります。データフローGCSからBQへの問題
圧縮されたGCSファイルのデータは、スキーマが頻繁に変更される複雑なJSON構造であるため、BQ内でJSF_EXTRACT関数を使用するのは、ファイル全体を1つの列のTSVとしてrecord
とするのが最も簡単です。必要な時に必要な値を解析します。
問題:このシナリオでは最小限のデータフローパイプラインを作成しました。 GCSから読み込んでBigQueryテーブルに書き込みます。私は、このパイプラインを実行すると、しかし、私はここに示され、JSONのパースエラーを取得しています:以下
Error while reading data, error message: JSON table encountered too
many errors, giving up. Rows: 1; errors: 1., error: Error while reading
data, error message: JSON table encountered too many errors, giving up.
Rows: 1; errors: 1., error: Error while reading data, error message:
JSON parsing error in row starting at position 2630029539: Value
encountered without start of object.
いくつかの変数を持つ私のデータフロースクリプトが匿名化されます。
from __future__ import absolute_import
import argparse
import logging
import re
import json
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET_NAME/input-data/000000_0.gz',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=GCP_PROJECT_NAME',
'--staging_location=gs://BUCKET_NAME/dataflow-staging',
'--temp_location=gs://BUCKET_NAME/dataflow-temp',
'--job_name=gcs-gzcomp-to-bq1',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
(p | "ReadFromGCS" >> ReadFromText(known_args.input)
| WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
project='GCP_PROJECT_NAME', schema='record:string'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
あなたが見ることができるように、私は、文字列型で1列のみを含むスキーマを指定することによって、私は伝統的なロード・ジョブでやっているのと同じことを行うために試みたが、それはまだ失敗しています。
GCSファイルのインポート方法について、Dataflowに詳細を明示的に伝える方法はありますか?つまり、各行で有効なJSONオブジェクトであるにもかかわらずTSVを指定していますか?
また、このエラーが他の何かに関連している場合は、私もそれを電話してください。私はDataflowの新機能が非常に新しくなっていますが、BQ &の他のGCPツールもかなり経験していますので、これを私のツールベルトに追加したいと考えています。
ワウ;ありがとう、トン。とてもシンプルですが、完璧に働きました。 – andre622