2017-05-30 5 views
1

現在、データフロー(Apache Beam、Python SDK)タスクを実行して> 100GBツイートファイルをBigQueryにインポートしようとしていますが、実行中Error: Message: Too many sources provided: 15285. Limit is 10000.エラー:メッセージ:提供されているソースが多すぎます:15285。制限は10000です

タスクはツイート(JSON)を受け取り、関連する5つのフィールドを抽出し、一部の変換でビットを変換/サニタイズし、それらの値をBigQueryに書き込みます。

Cloud Dataflow to BigQuery - too many sourcesがありますが、1つの入力ファイルしか持っていないのに、入力ファイルが多すぎるために発生していると思われます。また、そこに記載されている解決策はかなりわかりにくいですし、もし私が私の問題にそれらを適用できるかどうかわかりません。

私の推測では、BigQueryは各行または何かの一時ファイルを永続化する前に書き込むのですが、それは「あまりにも多くのソース」の意味ですか?

どうすればこの問題を解決できますか?

[編集]

コード:

import argparse 
import json 
import logging 

import apache_beam as beam 

class JsonCoder(object): 
    """A JSON coder interpreting each line as a JSON string.""" 

    def encode(self, x): 
     return json.dumps(x) 

    def decode(self, x): 
     return json.loads(x) 

def filter_by_nonempty_county(record): 
    if 'county_fips' in record and record['county_fips'] is not None: 
     yield record 

def run(argv=None): 

    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         default='...', 
         help=('Input twitter json file specified as: ' 
          'gs://path/to/tweets.json')) 
    parser.add_argument(
     '--output', 
     required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 

    known_args, pipeline_args = parser.parse_known_args(argv) 



    p = beam.Pipeline(argv=pipeline_args) 

    # read text file 

    #Read all tweets from given source file 
    read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder()) 

    #Extract the relevant fields of the source file 
    extract_fields = "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'], 
                    'user_id': row['user']['id'], 
                    'location': row['user']['location'] if 'location' in row['user'] else None, 
                    'geo':row['geo'] if 'geo' in row else None, 
                    'tweet_id': row['id'], 
                    'time': row['created_at']}) 


    #check what type of geo-location the user has 
    has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2) 


    check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0 

    #tweet has coordinates partition or not 
    coordinate_partition = (p 
      | read_tweets 
      | extract_fields 
      | beam.ParDo(TimeConversion()) 
      | has_geo_location_or_not) 


    #lookup by coordinates 
    geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator()) 
          | "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2)) 

    #lookup by profile 
    profile_lookup = ((coordinate_partition[0], geo_lookup[0]) 
         | "join streams" >> beam.Flatten() 
         | "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile()) 
        ) 


    bigquery_output = "write output to BigQuery" >> beam.io.Write(
     beam.io.BigQuerySink(known_args.output, 
        schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING', 
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
       write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 

    #file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder()) 


    output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten() 
       | "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county) 
       | "project relevant fields" >> beam.Map(lambda row: {'text': row['text'], 
                    'user_id': row['user_id'], 
                    'county_fips': row['county_fips'], 
                    'tweet_id': row['tweet_id'], 
                    'time': row['time'], 
                    'county_source': row['county_source']}) 
       | bigquery_output) 

    result = p.run() 
    result.wait_until_finish() 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.DEBUG) 
    run() 

それは少し複雑ですので、それはおそらく直接BigQueryの中でそれを行うには、あまりにも多くの時間がかかります。このコードは、つぶやきjsonを読み込み、ジオタグ付けされているかどうかでPCollectionを分割します。プロファイルされていなければPCollectionを分割し、GIS解析に関連する場所にマップし、BigQueryに書き込みます。

+0

あなたのコードを共有できますか?また、Beamを使用して変換する必要があります。つまり、BigQueryでファイルを読み込んだり、ファイルをGCSに読み込んだり、BigQueryをポイントしたり、途中で変換したりすることができますか。 –

+0

編集でコードを追加しました – Zenon

+0

使用しているSDKのバージョンはどれですか? –

答えて

1

ファイルの数は、これを削減する

1つのトリックは、いくつかのランダムなキーを生成することである。要素がで処理された破片の数に対応し、グループにそれらを書き出す前に、それに基づいて要素。

たとえば、あなたはあなたのパイプラインで、次のDoFnPTransformを使用することができます。

class _RoundRobinKeyFn(beam.DoFn): 
    def __init__(self, count): 
    self.count = count 

    def start_bundle(self): 
    self.counter = random.randint(0, self.count - 1) 

    def process(self, element): 
    self.counter += 1 
    if self.counter >= self.count: 
     self.counter -= self.count 
    yield self.counter, element 

class LimitBundles(beam.PTransform): 
    def __init__(self, count): 
    self.count = count 

    def expand(self, input): 
    return input 
     | beam.ParDo(_RoundRobinKeyFn(this.count)) 
     | beam.GroupByKey() 
     | beam.FlatMap(lambda kv: kv[1]) 

あなただけbigquery_output前にこれを使用します。

output = (# ... 
     | LimitBundles(10000) 
     | bigquery_output) 

は(私はこの中に入力したことに注意してくださいテストすることなく、Pythonのタイプミスが発生する可能性があります)。

関連する問題