2017-07-31 22 views
0

は次のBigQueryをcsvファイルから読み込み、別のcsvファイルとに書き込むことになっているコードです:csvファイルを読み、BigQueryのテーブルにデータを記入

import argparse 
import logging 
import re 
import apache_beam as beam 
from apache_beam.io import ReadFromText 
from apache_beam.io import WriteToText 
from apache_beam.metrics import Metrics 
from apache_beam.metrics.metric import MetricsFilter 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import SetupOptions 
parser = argparse.ArgumentParser() 
parser.add_argument('--input', 
        dest='input', 
        default='gs://dataflow-samples/shakespeare/kinglear.txt', 
        help='Input file to process.') 
parser.add_argument('--output', 
        dest='output', 
        required=True, 
        help='Output file to write results to.') 
known_args, pipeline_args = parser.parse_known_args(None) 
pipeline_options = PipelineOptions(pipeline_args) 
pipeline_options.view_as(SetupOptions).save_main_session = True 
p = beam.Pipeline(options=pipeline_options) 
# Read the text file[pattern] into a PCollection. 
lines = p | 'read' >> ReadFromText(known_args.input) 
lines | beam.Map(lambda x: x.split(',')) 
lines | 'write' >> WriteToText(known_args.output) 
lines | 'write2' >> beam.io.Write(beam.io.BigQuerySink('xxxx:yyyy.aaaa')) 
# Actually run the pipeline (all operations above are deferred). 
result = p.run() 

書き込むことができます出力ファイルに、(XXXX:yyyy.aaaa)BigQueryの表に、そうすることはできません。

WARNING:root:A task failed with exception. 
'unicode' object has no attribute 'iteritems' 

をCSVファイルに含まれている表があり、次が表示されたメッセージである

書かれていないスキーマは同じで、BigQueryテーブルは空ですが、BigQueryに追加できます。私は、データがJSON形式に変換されなければならないため、これが原因だと思う。 このコードが正しく機能するためには、このコードに対して行わなければならない訂正は何ですか?これを動作させるために追加しなければならないコード行を教えてください。次の行を見ると

答えて

0

1: lines = p | 'read' >> ReadFromText(known_args.input) 
2: lines | beam.Map(lambda x: x.split(',')) 
3: lines | 'write' >> WriteToText(known_args.output) 
4: lines | 'write2' >> beam.io.Write(beam.io.BigQuerySink('xxxx:yyyy.aaaa')) 
  1. は、テキストファイルから読み込む行のPCollectionするlinesを定義します。
  2. 各行を分割して新しいPCollectionを作成します。しかし、それは実際にはPCollectionを保持しないので、効果的に何もしません。
  3. 元の行をテキストファイルに書き込みます(行ごとに1つの単語が表示されないように、各出力に元の行が1つずつ表示されます)。
  4. 入力から読み込んだ行をBigQueryファイルに書き込みます。

あなたはBigQuery tornadoes exampleを見ればあなたは(1)あなたは(2)あなたがBigQuerySinkにその辞書に一致するスキーマを提供する必要があり、各列の広告の分野で辞書に各行を変換する必要があることがわかります。例:

def to_table_row(x): 
    values = x.split(',') 
    return { 'field1': values[0], 'field2': values[1] } 

lines = p | 'read' >> ReadFromText(known_args.input) 
lines 
    | 'write' >> WriteToText(known_args.output) 
lines 
    | 'ToTableRows' >> beam.Map(to_table_row) 
    | 'write2' >> beam.io.Write(beam.io.BigQuerySink(
     'xxxx:yyyy.aaaa', 
     schema='field1:INTEGER, field2:INTEGER')) 
+0

これは部分的に機能しています。しかし、まだ1つの問題があります。私のスキーマには文字列があります。エラーは次のとおりです。 "'' str 'オブジェクトには属性' iteritems 'がありません。私の推測では、テーブルは文字列ではなく辞書として扱われるからです。これを解決する方法はありますか? – Nagaraju

関連する問題