2017-07-21 23 views
2

私はBeamパイプラインを使用して、一連の単語にSequenceMatcher関数を適用しようとしています。私は(うまくいけば)WriteToText部分を除いてすべてを考え出した。Apache-Beam + Python:JSON(または辞書)文字列を出力ファイルに書き込む

私は私のパイプラインは

class ProcessData(beam.DoFn): 
    def process(self, element, side_input): 

    ... Series of operations ... 

    return output_dictionary 

with beam.Pipeline(options=options) as p: 

    # Main input 
    main_input = p | 'ReadMainInput' >> beam.io.Read(
     beam.io.BigQuerySource(
      query=CUSTOM_SQL, 
      use_standard_sql=True 
     )) 

    # Side input 
    side_input = p | 'ReadSideInput' >> beam.io.Read(
     beam.io.BigQuerySource(
      project=PROJECT_ID, 
      dataset=DATASET, 
      table=TABLE 
     )) 

    output = (
     main_input 
     | 'ProcessData' >> beam.ParDo(
      ProcessDataDoFn(), 
      side_input=beam.pvalue.AsList(side_input)) 
     | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET) 
    ) 

非常に簡単です。この1

{u'key': (u'string', float)} 

ようmain_inputとside_inputを取るカスタム(ここProcessDataDoFnと呼ばれる)パルド、プロセス、それらを出力辞書を定義しています問題は、このようにパイプラインを離れると、output_dictionaryのキーだけが出力されるということです。私はjson.dumps(ouput_dictionary)へProcessDataDoFnの復帰を変更する場合は、JSONはこれ

{ 
' 
k 
e 
y 
' 

: 

[ 
' 
s 
t 
r 
i 
n 
g 
' 

, 

f 
l 
o 
a 
t 
] 

どのように私が正しく出力結果のように正しく書かれていますか?

答えて

2

私は実際に問題を解決しました。

私が書いたParDoFnは、辞書またはJSON形式の文字列を返します。どちらの場合も、Beamが上記の入力で何かをしようとするときに問題が発生します。もしPCollectionが辞書であれば、Beanは与えられたPCollectionを繰り返し処理しているように見えます.PCollectionが文字列の場合、すべての文字を繰り返し処理します(その理由はJSONの出力が奇妙です)。私は解決策がむしろ単純であることを知っています。辞書または文字列をリストにカプセル化します。 JSONの書式設定の部分は、ParDoFnレベルで、またはあなたが示したようなTransformを介して行うことができます。

+1

Python API [ここ](https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.core.ParDo)に記載されているとおり: " DoFnは、入力PCollectionの の要素ごとにiterableを返す必要があることに注意してください。これを行う簡単な方法は、processメソッドでyieldキーワードを使用することです。 –

1

あなたの出力がそうであることは珍しいことです。 json.dumpsは、jsonを1行で表示する必要があります。行ごとにファイルに出力する必要があります。

おそらく、よりクリーンなコードを使用するために、必要に応じて書式設定を行う余分なマップ操作を追加できます。何かのように:

output = (
    main_input 
    | 'ProcessData' >> beam.ParDo(
     ProcessDataDoFn(), 
     side_input=beam.pvalue.AsList(side_input)) 
    | 'FormatOutput' >> beam.Map(json.dumps) 
    | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET) 
) 
関連する問題