2016-11-22 7 views
1

現在、Python Apache Beamパイプラインが動作しており、ローカルで実行することができます。現在、パイプラインをGoogle Cloud Dataflow上で実行し、完全に自動化していますが、Dataflow/Apache Beamのパイプライン監視には制限があります。Python Apache BeamパイプラインステータスAPIコール

現在のところ、Cloud Dataflowには、UIインターフェイスまたはコマンドラインのgcloudを通じて、パイプラインステータスを監視する2つの方法があります。これらのソリューションは両方とも、無損失のファイル処理を考慮に入れることができる完全自動化ソリューションには適していません。

Apache Beamのgithubには、internal/apiclient.pyというファイルがあり、ジョブの状態を取得するための関数があることを示しています。get_job

get_jobが使用されたことがわかった1つのインスタンスはrunners/dataflow_runner.pyです。

最終的な目標は、このAPIを使用して、自動的に実行されるジョブの状態を取得して、最終的にすべてがパイプラインを通じて正常に処理されるようにすることです。

パイプライン(p.run())を実行した後、このAPIをどのように使用することができますか?私たちはにrunnerがどこから来るのか理解していません。

誰かがこのAPIコールにアクセスする方法を理解してくれれば、パイプラインをセットアップ/実行することができます。

答えて

1

私はちょうどコードを覗き込み、仕事の詳細を取得する方法を見つけました。私たちの次のステップは、すべての仕事のリストを取得する方法があるかどうかを確認することです。

# start the pipeline process 
pipeline     = p.run() 
# get the job_id for the current pipeline and store it somewhere 
job_id     = pipeline.job_id() 
# setup a job_version variable (either batch or streaming) 
job_version    = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION 
# setup "runner" which is just a dictionary, I call it local 
local     = {} 
# create a dataflow_client 
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version) 
# get the job details from the dataflow_client 
print local['dataflow_client'].get_job(job_id) 
関連する問題