2016-12-15 41 views
3

私はcsvファイルを読んで、apache beam dataflowを使ってBigQueryに書きたいと思います。これを行うには、データをBigQueryに辞書形式で提示する必要があります。これを行うには、どうすればデータをapacheビームで変換できますか?apache beam dataflowでcsvを辞書に変換する方法

私の入力csvファイルには2つの列があり、次の2つの列テーブルをBigQueryで作成します。私はBigQueryでデータを作成する方法を知っています。それはまっすぐなものです。私が知らないのは、CSVを辞書に変換する方法です。以下のコードは正しくありませんが、私がしようとしていることのアイデアを与える必要があります。

# Standard imports 
import apache_beam as beam 
# Create a pipeline executing on a direct runner (local, non-cloud). 
p = beam.Pipeline('DirectPipelineRunner') 
# Create a PCollection with names and write it to a file. 
(p 
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv')) 
# How do you do this?? 
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v}) 
| 'save' >> beam.Write(
    beam.io.BigQuerySink(
    output_table, 
    schema='month:INTEGER, tornado_count:INTEGER', 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) 
p.run() 

答えて

8

考えられるのは、解析されたCSV行を返すソースを持つことです。 FileBasedSourceクラスをサブクラス化して、CSV解析を組み込むことで、これを行うことができます。私は最近、ApacheのビームのためのCsvFileSourceを書いた

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource): 
    def read_records(self, file_name, range_tracker): 
    self._file = self.open_file(file_name) 

    reader = csv.reader(self._file) 

    for rec in reader: 
     yield rec 

:特に、read_records関数は次のようになります。 the Github repositoryをご覧ください。 pip install beam_utilsfrom beam_utils.sources import CsvFileSourceを使用して使用できます。 CsvFileSourceには、カスタムの区切り文字を設定するオプション、ファイルのヘッダーをスキップするオプション、および/またはリストの代わりに出力する辞書が含まれています。

+1

多くのありがとうパブロ、これは本当にうまくいきます!人々が完全性を追求している場合のコードスニペットは次のとおりです( | '太陽データを読み取る' >> beam.Read(CsvFileSource( './ sensor1_121116.csv')) | 'save' >> beam.Write .io.TextFileSink( './ greetings_solar'))) – user1753640

+1

BigQueryに結果を書き込もうとしていますが、テーブルは作成されますがデータは取得されません。あなたは何が起こっているのか教えていただけますか?ここではスニペットがあります( | '読み込みソーラーデータ' >> beam.Read(CsvFileSource( './ sensor1_121116.csv')) | 'save' >> beam.Write( beam.io.BigQuerySink( output_table 、 スキーマ= 'lumosity:INTEGER、時間:INTEGER'、 create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED、 write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE))) – user1753640

+0

user1753640 @:私は同じ問題を抱えていたとしていましたデータをGBQに格納する前に、スキーマと一致する辞書を使用してください。 – vdolez

0

パブロのポストの補足として、私は彼のサンプルに自分自身を少し変更したいと思います。 (あなたのための1!)に変更

reader = csv.reader(self._file)

csv.DictReaderreader = csv.DictReader(self._file)にはDictのキーとしてCSVファイルの最初の行を使用しています。他の行は、その値を持つ行ごとにdictを設定するために使用されます。右の値は、列の順序に基づいて正しいキーに自動的に配置されます。

Dict内のすべての値が文字列として格納されるという少し詳細があります。たとえば、BigQueryスキーマと競合する可能性があります。一部のフィールドではINTEGER。それでは、後で適切なキャスティングをする必要があります。

関連する問題