私はcsvファイルを読んで、apache beam dataflowを使ってBigQueryに書きたいと思います。これを行うには、データをBigQueryに辞書形式で提示する必要があります。これを行うには、どうすればデータをapacheビームで変換できますか?apache beam dataflowでcsvを辞書に変換する方法
私の入力csvファイルには2つの列があり、次の2つの列テーブルをBigQueryで作成します。私はBigQueryでデータを作成する方法を知っています。それはまっすぐなものです。私が知らないのは、CSVを辞書に変換する方法です。以下のコードは正しくありませんが、私がしようとしていることのアイデアを与える必要があります。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
多くのありがとうパブロ、これは本当にうまくいきます!人々が完全性を追求している場合のコードスニペットは次のとおりです( | '太陽データを読み取る' >> beam.Read(CsvFileSource( './ sensor1_121116.csv')) | 'save' >> beam.Write .io.TextFileSink( './ greetings_solar'))) – user1753640
BigQueryに結果を書き込もうとしていますが、テーブルは作成されますがデータは取得されません。あなたは何が起こっているのか教えていただけますか?ここではスニペットがあります( | '読み込みソーラーデータ' >> beam.Read(CsvFileSource( './ sensor1_121116.csv')) | 'save' >> beam.Write( beam.io.BigQuerySink( output_table 、 スキーマ= 'lumosity:INTEGER、時間:INTEGER'、 create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED、 write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE))) – user1753640
user1753640 @:私は同じ問題を抱えていたとしていましたデータをGBQに格納する前に、スキーマと一致する辞書を使用してください。 – vdolez