私はXlsxファイルを読み込むコードを持っており、各行に対して特定の列のプロセスを実行します。Google DataFlow、コレクションを変換するときに外部webhookを待つ方法
問題は、データフローの「変換」部分に関連しています。私はリーダーから送られた値を取得する特定のメソッドを実装し、このデータは外部のサーバーに送られます。この外部サーバーはデータを処理し(数分かかる可能性があります)、その結果をPOSTリクエストします。 (POSTリクエストのURLが元の要求に指定されている
私の質問は以下の通りです:。?どのように私は外のプロセスは、()コールバックの外で行われたとき、私のパルド方法を通知することができます
をここでのマイこれまでコード:
import logging, argparse
import apache_beam as beam
from apache_beam.io import gcsio
from apache_beam.utils.options import PipelineOptions
from openpyxl import load_workbook
# @See https://cloud.google.com/dataflow/model/custom-io-python#ptransform-wrappers
class FileReader():
"""A file reader implementation"""
def __init__(self, path, *args, **kwargs):
self.path = path
def reader(self):
return XlsxFileReader(self.path)
class XlsxFileReader():
"""The Xlsx file reader"""
def __init__(self, path):
self.path = path
def _clean_value(self, value):
if value is None:
return None
value = unicode(value)
try:
value = value.encode('utf-8')
except UnicodeEncodeError:
pass
return value
def __iter__(self):
wb = load_workbook(filename=self.file, read_only=True)
sheet_name = wb.get_sheet_names()[0]
ws = wb[sheet_name]
for line, row in enumerate(ws.rows):
cell_value = self._clean_value(row[0].value)
if cell_value is not None and cell_value.find('@') > 0:
yield cell_value, line
break
def __enter__(self):
self.file = gcsio.GcsIO().open(self.path, 'r')
return self
def __exit__(self, *args, **kwargs):
self.file.close()
class ComputeWordLengthFn(beam.DoFn):
def process(self, context):
# Here, what I would need is send a request to an external API, that returns the result to the `callback` parameter.
# I know how to do that using requests
#
# ***********************************************************
# ---> BUT HOW can I know when that external service has done with my data and called back my `callback` url?
# ***********************************************************
yield context.element[0] is done once external service has made a request to the `callback` url on my instance.
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://norbert-verify-staging/growthlist.xlsx',
help='Input file to process.'
)
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
p | 'read' >> beam.io.Read(FileReader(known_args.input)) \
| 'verify' >> beam.ParDo(ComputeWordLengthFn()) \
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
私はそれを明確に願っています、あなたはより多くの詳細が必要な場合は私に知らせて、私は完全にあなたの質問を理解したが、ビームがための方法を提供していますかどうかを尋ねるしているあなたのように思える場合