2016-10-03 5 views
0

データフローを学習し始めました。このコード例のオートコンプリートを使用しています。私は、BigQueryのから読み取るしようとしていますが、私はこのエラーを取得しています:Google DataFlow - BigQueryから読んでいる例を自動完成

ERROR:root:Error while visiting split 
... 
File "/usr/lib/python2.7/re.py", line 181, in findall 
return _compile(pattern, flags).findall(string) 
TypeError: expected string or buffer [while running 'split'] 

コード:

def run(argv=None): 

parser = argparse.ArgumentParser() 
parser.add_argument('--output', 
        required=True, 
        help='Output file to write results to.') 
known_args, pipeline_args = parser.parse_known_args(argv) 

pipeline_options = PipelineOptions(pipeline_args) 
pipeline_options.view_as(SetupOptions).save_main_session = True 
p = beam.Pipeline(options=pipeline_options) 

(p # pylint: disable=expression-not-assigned 
| 'read' >> beam.io.Read(beam.io.BigQuerySource(input_table)) 
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) 
| 'TopPerPrefix' >> TopPerPrefix(5) 
| 'format' >> beam.Map(
    lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) 
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) 
p.run() 

私は任意のフィードバックを感謝しています。 Tks Tom

答えて

1

BigQuerySourceは、デフォルトで列から入力テーブルの値までの構造化レコードを返します。つまり、レコードの上にre.findallを直接実行することはできません。

代わりに、あなたが気に特定のフィールド、すなわちを抽出

| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x['my_string_field'])) 
関連する問題