0

Apache BeamのMinimalWordCount pythonの例を変更して、BigQueryテーブルから読み込もうとしています。私は次のような変更を加えました。BigQueryから読み取るMinimalWordCountの例を変更する

ここでオリジナル例:

with beam.Pipeline(options=pipeline_options) as p: 

    # Read the text file[pattern] into a PCollection. 
    lines = p | ReadFromText(known_args.input) 

    # Count the occurrences of each word. 
    counts = (
     lines 
     | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) 
         .with_output_types(unicode)) 
     | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) 
     | 'GroupAndSum' >> beam.CombinePerKey(sum)) 

    # Format the counts into a PCollection of strings. 
    output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) 

    # Write the output using a "Write" transform that has side effects. 
    # pylint: disable=expression-not-assigned 
    output | WriteToText(known_args.output) 

よりもむしろReadFromText私は、BigQueryのテーブルの列から読み取るために、これを調整しようとしています。私は次のコードでlines = p | ReadFromText(known_args.input)を交換したこれを行うには:

私はパイプラインを再実行
query = 'SELECT text_column FROM `bigquery.table.goes.here` ' 
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 

、私はエラーを取得:「WARNING:root:A task failed with exception. expected string or buffer [while running 'Split']

は私が「分割」の動作が期待されていることを認識しますそれは明らかに文字列を取得していません。どのように 'ReadFromBigQuery'を修正して文字列/バッファを渡すことができますか? 'ReadFromBigQuery'の結果を文字列のバッファに変換するためのテーブルスキーマなどを用意する必要がありますか?

答えて

1

これは、BigQuerySourceが辞書のPCollection(辞書のすべてのキーが列を表す)の戻り値であるためです(dict)。あなたのケースのために最も簡単な方法は、ちょうどこのようbeam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)beam.Mapを適用されます:あなたは、列名で問題が発生した場合

lines = (p 
|"ReadFromBigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 
| "Extract text column" >> beam.Map(lambda row: row.get("text_column")) 
     ) 

u"text_column"に変更してみてください。

またあなたスプリットがある列の値を抽出するために変換変更することができます。

'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x.get("text_column"))) 
         .with_output_types(unicode)) 
+0

ありがとう!私は実際に '' Split '>>(beam.FlatMap(lambda x:re.findall(r' [A-Za-z \ '] +'、x ["text_column" ).with_output_types(unicode)) 'これは単に構文の問題か、これがどのように動作するかに大きな違いがありますか? – reese0106

+0

それは構文ではなく、それらのソースのさまざまな動作です - 生のテキストを出力し、もう1つは辞書を出力します。あなたは何かを返すソースを持つことができます。 –

関連する問題