通常、元のdict
データ要素をCSV形式のstring
表現に変換できる関数を作成すると便利です。
この関数は、DoFn
と書くことができます。この関数は、各コレクション要素を目的の形式に変換するデータのBeam PCollection
に適用できます。これを行うには、DoFn
をPCollection
にParDo
で送信します。このDoFn
をより使いやすいPTransform
にラップすることもできます。
あなたがここにBeam Programming Guide
にこのプロセスについての詳細を学ぶことができ、簡単な、翻訳可能な非ビームの例である:
['Here is a sentence,1', 'Another sentence goes here,2']
:
# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
{'label': 2, 'text': 'Another sentence goes here'}]
def convert_my_dict_to_csv_record(input_dict):
""" Turns dictionary values into a comma-separated value formatted string """
return ','.join(map(str, input_dict.values()))
# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]
converted_test_input
は、次のようになりますが
ビームDictToCSV DoFnとPTransformの例を使用してDictWriter
from csv import DictWriter
from csv import excel
from cStringIO import StringIO
...
def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
""" Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
Note: This implementation does not support unicode
"""
buf = StringIO()
writer = DictWriter(buf,
fieldnames=column_order,
restval=missing_val,
extrasaction=('ignore' if discard_extras else 'raise'),
dialect=dialect)
writer.writerow(element)
return buf.getvalue().rstrip(dialect.lineterminator)
class _DictToCSVFn(DoFn):
""" Converts a Dictionary to a CSV-formatted String
column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
missing_val: The value to be written when a named field from `column_order` is not found in the input element
discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect
"""
def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
self._column_order = column_order
self._missing_val = missing_val
self._discard_extras = discard_extras
self._dialect = dialect
def process(self, element, *args, **kwargs):
result = _dict_to_csv(element,
column_order=self._column_order,
missing_val=self._missing_val,
discard_extras=self._discard_extras,
dialect=self._dialect)
return [result,]
class DictToCSV(PTransform):
""" Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings
column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
missing_val: The value to be written when a named field from `column_order` is not found in an input element
discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect
"""
def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
self._column_order = column_order
self._missing_val = missing_val
self._discard_extras = discard_extras
self._dialect = dialect
def expand(self, pcoll):
return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
missing_val=self._missing_val,
discard_extras=self._discard_extras,
dialect=self._dialect)
)
例を使用するには、PCollection
にあなたのtest_input
を入れて、PCollection
にDictToCSV
PTransform
を適用します。結果としてPCollection
に変換され、WriteToText
の入力として使用できます。 column_order
引数を使用して、辞書入力要素のキーに対応する列名のリストまたはタプルを指定する必要があります。結果のCSV形式の文字列は、指定された列名の順になります。また、この例の基礎となる実装はunicode
をサポートしていません。
おかげで可能ならばcsv.DictWriterを利用するために、より安全になります、アンドリュー! ConverDictToCSVFn()のようなものがApache-Beam内に存在していた場合や、最初から書き直さなければならないかどうか、私が思っていたことが分かりました。このタイプの関数を書くことは自明ではありません。なぜなら、センテンスにカンマ(またはセパレータが何であれ)が含まれていれば、通常、文全体を二重引用符で囲む必要があるからです。 Apache-Beam内にこれらのケースを処理するためのものは何もないことを示唆しています。 – reese0106
私は 'textio'がこの利便性を利用できるとは思わないから - これまでは、' csv'モジュールの 'DictWriter'とPythonの' StringIO'モジュールを組み合わせることでこれをPythonで実装できると思います。 –
あなたが提案していることについて、これ以上ガイダンスを提供できますか?おそらく別の答えでしょうか?私はDictWriterを使う方法を見つけることができなかったので、これは本当に私の質問の要点です。 – reese0106