2017-11-21 10 views
1

Google DataFlow(Apache Beamベース)Python SDKを使用して、私たちのパイプラインをテストする方法を理解しています。Pythonでビームパイプライン(Google Dataflow)をどのようにテストしますか?

https://beam.apache.org/documentation/pipelines/test-your-pipeline/ https://cloud.google.com/dataflow/pipelines/creating-a-pipeline-beam

上記のリンクは、Javaだけのためです。私はGoogleがJava Apacheのテストを指す理由についてかなり混乱しています。

私は2つのコレクションにCoGroupByKey結合の結果を表示したいと考えています。私はPythonの背景から来ており、Beam/Dataflowを使用する経験はほとんどありません。

本当に助けてもらえますか?私はこれがある程度開いていることを知っています。基本的に私はパイプライン内で結果を見ることができる必要があり、私のCoGroupByKey Joinの結果を見ることができません。

印刷されて何

#dwsku, product are PCollections coming from BigQuery. Nested Values as 
    #well in Product, but not dwsku 
    d1 = {'dwsku': dwsku, 'product': product} 
    results = d1 | beam.CoGroupByKey() 
    print results 

以下に、コード:あなたはあなたのマシン上でローカルにそれをテストしたい場合は

PCollection[CoGroupByKey/Map(_merge_tagged_vals_under_key).None] 
+0

を実行しているとき、私は完全に質問を得ることはありませんが、多分これは役立ちますクエリにLIMITを置く必要がありますか? https://beam.apache.org/get-started/quickstart-py/ –

答えて

1

、あなたがDirectRunnerを使用して起動する必要があり、その後、あなたはそれをデバッグすることができます - ログを出力するか、デバッガで実行を停止してください。

次の操作を行うことができ、ローカルに全体PCollectionを見るために:

d1 = {'dwsku': dwsku, 'product': product} 
results = d1 | beam.CoGroupByKey() 

def my_debug_function(pcollection_as_list): 
    # add a breakpoint in this function or just print 
    print pcollection_as_list 

debug = (results | beam.combiners.ToList() | beam.Map(my_debug_function)) 

ここで覚えておくべきいくつかのものがあります:

  • ToList()変換は、潜在的に多くのメモリを割り当てることができます
  • DirectRunnerを使用している間は、.wait_until_finish()パイプラインのメソッドを使用して、パイプラインの実行が終了する前にスクリプトが終了しないようにしてください。
  • のBigQueryからあなたのパイプラインのデータをダウンロードする場合は、ローカル
+0

pcollections(productとdwsku)が必要なキーの代わりにbigquery(tablerow)からまっすぐ来ているので、私はそのコードを動作させることができません。値の形式。これを解決するために何かできることはありますか? – codebrotherone

+0

私は問題が何かを得ることはできません...あなたの質問を精緻化したり更新したりできますか?それがPCollectionなら、それはうまくいくはずです。 –

関連する問題