私はBeamパイプラインを使用して、一連の単語にSequenceMatcher関数を適用しようとしています。私は(うまくいけば)WriteToText部分を除いてすべてを考え出した。Apache-Beam + Python:JSON(または辞書)文字列を出力ファイルに書き込む
私は私のパイプラインは
class ProcessData(beam.DoFn):
def process(self, element, side_input):
... Series of operations ...
return output_dictionary
with beam.Pipeline(options=options) as p:
# Main input
main_input = p | 'ReadMainInput' >> beam.io.Read(
beam.io.BigQuerySource(
query=CUSTOM_SQL,
use_standard_sql=True
))
# Side input
side_input = p | 'ReadSideInput' >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT_ID,
dataset=DATASET,
table=TABLE
))
output = (
main_input
| 'ProcessData' >> beam.ParDo(
ProcessDataDoFn(),
side_input=beam.pvalue.AsList(side_input))
| 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
非常に簡単です。この1
{u'key': (u'string', float)}
ようmain_inputとside_inputを取るカスタム(ここProcessDataDoFnと呼ばれる)パルド、プロセス、それらを出力辞書を定義しています問題は、このようにパイプラインを離れると、output_dictionaryのキーだけが出力されるということです。私はjson.dumps(ouput_dictionary)へProcessDataDoFnの復帰を変更する場合は、JSONはこれ
{
'
k
e
y
'
:
[
'
s
t
r
i
n
g
'
,
f
l
o
a
t
]
どのように私が正しく出力結果のように正しく書かれていますか?
Python API [ここ](https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.core.ParDo)に記載されているとおり: " DoFnは、入力PCollectionの の要素ごとにiterableを返す必要があることに注意してください。これを行う簡単な方法は、processメソッドでyieldキーワードを使用することです。 –