2016-12-07 4 views
1

私はXlsxファイルを読み込むコードを持っており、各行に対して特定の列のプロセスを実行します。Google DataFlow、コレクションを変換するときに外部webhookを待つ方法

問題は、データフローの「変換」部分に関連しています。私はリーダーから送られた値を取得する特定のメソッドを実装し、このデータは外部のサーバーに送られます。この外部サーバーはデータを処理し(数分かかる可能性があります)、その結果をPOSTリクエストします。 (POSTリクエストのURLが元の要求に指定されている

私の質問は以下の通りです:。?どのように私は外のプロセスは、()コールバックの外で行われたとき、私のパルド方法を通知することができます

をここでのマイこれまでコード:

import logging, argparse 
import apache_beam as beam 
from apache_beam.io import gcsio 
from apache_beam.utils.options import PipelineOptions 

from openpyxl import load_workbook 


# @See https://cloud.google.com/dataflow/model/custom-io-python#ptransform-wrappers 
class FileReader(): 
    """A file reader implementation""" 

    def __init__(self, path, *args, **kwargs): 
     self.path = path 

    def reader(self): 
     return XlsxFileReader(self.path) 


class XlsxFileReader(): 
    """The Xlsx file reader""" 
    def __init__(self, path): 
     self.path = path 

    def _clean_value(self, value): 
     if value is None: 
      return None 

     value = unicode(value) 

     try: 
      value = value.encode('utf-8') 
     except UnicodeEncodeError: 
      pass 

     return value 

    def __iter__(self): 
     wb = load_workbook(filename=self.file, read_only=True) 
     sheet_name = wb.get_sheet_names()[0] 
     ws = wb[sheet_name] 
     for line, row in enumerate(ws.rows): 
      cell_value = self._clean_value(row[0].value) 
      if cell_value is not None and cell_value.find('@') > 0: 
       yield cell_value, line 
       break 

    def __enter__(self): 
     self.file = gcsio.GcsIO().open(self.path, 'r') 
     return self 

    def __exit__(self, *args, **kwargs): 
     self.file.close() 


class ComputeWordLengthFn(beam.DoFn): 
    def process(self, context): 
     # Here, what I would need is send a request to an external API, that returns the result to the `callback` parameter. 
     # I know how to do that using requests 
     # 
     # *********************************************************** 
     # ---> BUT HOW can I know when that external service has done with my data and called back my `callback` url? 
     # *********************************************************** 
     yield context.element[0] is done once external service has made a request to the `callback` url on my instance. 


def run(argv=None): 
    parser = argparse.ArgumentParser() 
    parser.add_argument(
     '--input', 
     dest='input', 
     default='gs://norbert-verify-staging/growthlist.xlsx', 
     help='Input file to process.' 
    ) 
    parser.add_argument(
     '--output', 
     dest='output', 
     required=True, 
     help='Output file to write results to.' 
    ) 
    known_args, pipeline_args = parser.parse_known_args(argv) 

    pipeline_options = PipelineOptions(pipeline_args) 
    p = beam.Pipeline(options=pipeline_options) 

    p | 'read' >> beam.io.Read(FileReader(known_args.input)) \ 
     | 'verify' >> beam.ParDo(ComputeWordLengthFn()) \ 
     | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) 

    p.run() 


if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

私はそれを明確に願っています、あなたはより多くの詳細が必要な場合は私に知らせて、私は完全にあなたの質問を理解したが、ビームがための方法を提供していますかどうかを尋ねるしているあなたのように思える場合

答えて

2

わかりません。一度呼び出されるDoFn.process()メソッド指定されたコールバックが呼び出されました。現在、Beamはそのような機能を提供していません。

ここでできることは、特定の要素の要求が完了するまでComputeWordLengthFn.process()メソッド内で待機することです(その待機を実行する正確な方法は外部APIによって異なります)。

あなたの質問に誤解があった場合はお知らせください。

関連する問題