Apache BeamのMinimalWordCount pythonの例を変更して、BigQueryテーブルから読み込もうとしています。私は次のような変更を加えました。BigQueryから読み取るMinimalWordCountの例を変更する
ここでオリジナル例:
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)
# Count the occurrences of each word.
counts = (
lines
| 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)
よりもむしろReadFromText
私は、BigQueryのテーブルの列から読み取るために、これを調整しようとしています。私は次のコードでlines = p | ReadFromText(known_args.input)
を交換したこれを行うには:
query = 'SELECT text_column FROM `bigquery.table.goes.here` '
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
、私はエラーを取得:「WARNING:root:A task failed with exception. expected string or buffer [while running 'Split']
」
は私が「分割」の動作が期待されていることを認識しますそれは明らかに文字列を取得していません。どのように 'ReadFromBigQuery'を修正して文字列/バッファを渡すことができますか? 'ReadFromBigQuery'の結果を文字列のバッファに変換するためのテーブルスキーマなどを用意する必要がありますか?
ありがとう!私は実際に '' Split '>>(beam.FlatMap(lambda x:re.findall(r' [A-Za-z \ '] +'、x ["text_column" ).with_output_types(unicode)) 'これは単に構文の問題か、これがどのように動作するかに大きな違いがありますか? – reese0106
それは構文ではなく、それらのソースのさまざまな動作です - 生のテキストを出力し、もう1つは辞書を出力します。あなたは何かを返すソースを持つことができます。 –