2017-10-17 49 views
1

Python SDKを使用して有効な.CSVファイルとしてGCSに書きたい値の辞書があります。私は改行で区切られたテキストファイルとして辞書を書き出すことができますが、辞書を有効な.CSVに変換する例は見つけられないようです。データフローのパイプライン内でcsvを生成する最良の方法を誰にでも提案できますか?この回答はquestionです。CSVファイルからの読み込みですが、実際にはCSVファイルへの書き込みには対応していません。私は、CSVファイルはルールを持つテキストファイルだと認識していますが、データの辞書をWriteToTextを使用して書き込むことができるCSVに変換するのはまだ苦労しています。上記Google Cloud Dataflow辞書からCSVに書き込む

test_input = [{'label': 1, 'text': 'Here is a sentence'}, 
       {'label': 2, 'text': 'Another sentence goes here'}] 


test_input | beam.io.WriteToText(path_to_gcs) 

改行に各辞書を持っていたテキストファイルにつながる:ここ

は、私はCSVに変身したいと思います簡単な例辞書です。私が利用できるApache Beam内の機能はありますか(csv.DictWriterに似ていますか?)

答えて

2

通常、元のdictデータ要素をCSV形式のstring表現に変換できる関数を作成すると便利です。

この関数は、DoFnと書くことができます。この関数は、各コレクション要素を目的の形式に変換するデータのBeam PCollectionに適用できます。これを行うには、DoFnPCollectionParDoで送信します。このDoFnをより使いやすいPTransformにラップすることもできます。

あなたがここにBeam Programming Guide

にこのプロセスについての詳細を学ぶことができ、簡単な、翻訳可能な非ビームの例である:

['Here is a sentence,1', 'Another sentence goes here,2'] 

# Our example list of dictionary elements 
test_input = [{'label': 1, 'text': 'Here is a sentence'}, 
      {'label': 2, 'text': 'Another sentence goes here'}] 

def convert_my_dict_to_csv_record(input_dict): 
    """ Turns dictionary values into a comma-separated value formatted string """ 
    return ','.join(map(str, input_dict.values())) 

# Our converted list of elements 
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input] 

converted_test_inputは、次のようになりますが

ビームDictToCSV DoFnとPTransformの例を使用してDictWriter

from csv import DictWriter 
from csv import excel 
from cStringIO import StringIO 

... 

def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel): 
    """ Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect 
     Note: This implementation does not support unicode 
    """ 

    buf = StringIO() 

    writer = DictWriter(buf, 
         fieldnames=column_order, 
         restval=missing_val, 
         extrasaction=('ignore' if discard_extras else 'raise'), 
         dialect=dialect) 
    writer.writerow(element) 

    return buf.getvalue().rstrip(dialect.lineterminator) 


class _DictToCSVFn(DoFn): 
    """ Converts a Dictionary to a CSV-formatted String 

     column_order: A tuple or list specifying the name of fields to be formatted as csv, in order 
     missing_val: The value to be written when a named field from `column_order` is not found in the input element 
     discard_extras: (bool) Behavior when additional fields are found in the dictionary input element 
     dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect 

    """ 

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel): 
     self._column_order = column_order 
     self._missing_val = missing_val 
     self._discard_extras = discard_extras 
     self._dialect = dialect 

    def process(self, element, *args, **kwargs): 
     result = _dict_to_csv(element, 
           column_order=self._column_order, 
           missing_val=self._missing_val, 
           discard_extras=self._discard_extras, 
           dialect=self._dialect) 

     return [result,] 

class DictToCSV(PTransform): 
    """ Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings 

     column_order: A tuple or list specifying the name of fields to be formatted as csv, in order 
     missing_val: The value to be written when a named field from `column_order` is not found in an input element 
     discard_extras: (bool) Behavior when additional fields are found in the dictionary input element 
     dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect 

    """ 

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel): 
     self._column_order = column_order 
     self._missing_val = missing_val 
     self._discard_extras = discard_extras 
     self._dialect = dialect 

    def expand(self, pcoll): 
     return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order, 
              missing_val=self._missing_val, 
              discard_extras=self._discard_extras, 
              dialect=self._dialect) 
          ) 

例を使用するには、PCollectionにあなたのtest_inputを入れて、PCollectionDictToCSVPTransformを適用します。結果としてPCollectionに変換され、WriteToTextの入力として使用できます。 column_order引数を使用して、辞書入力要素のキーに対応する列名のリストまたはタプルを指定する必要があります。結果のCSV形式の文字列は、指定された列名の順になります。また、この例の基礎となる実装はunicodeをサポートしていません。

+0

おかげで可能ならばcsv.DictWriterを利用するために、より安全になります、アンドリュー! ConverDictToCSVFn()のようなものがApache-Beam内に存在していた場合や、最初から書き直さなければならないかどうか、私が思っていたことが分かりました。このタイプの関数を書くことは自明ではありません。なぜなら、センテンスにカンマ(またはセパレータが何であれ)が含まれていれば、通常、文全体を二重引用符で囲む必要があるからです。 Apache-Beam内にこれらのケースを処理するためのものは何もないことを示唆しています。 – reese0106

+0

私は 'textio'がこの利便性を利用できるとは思わないから - これまでは、' csv'モジュールの 'DictWriter'とPythonの' StringIO'モジュールを組み合わせることでこれをPythonで実装できると思います。 –

+0

あなたが提案していることについて、これ以上ガイダンスを提供できますか?おそらく別の答えでしょうか?私はDictWriterを使う方法を見つけることができなかったので、これは本当に私の質問の要点です。 – reese0106

0

アンドリューの提案に基づき、ここで私が作成したConvertDictToCSV機能である:

def ConvertDictToCSV(input_dict, fieldnames, separator=",", quotechar='"'): 
    value_list = [] 
    for field in fieldnames: 
    if input_dict[field]: 
     field_value = str(input_dict[field]) 
    else: 
     field_value = "" 
    if separator in field_value: 
     field_value = quotechar + field_value + quotechar 
    value_list.append(field_value) 

    return separator.join(value_list) 

これがうまく機能しているように見えるが、確か

+0

' field_value'に 'escotechar'が含まれていると問題が発生します。 –

関連する問題