2017-10-11 34 views

答えて

1

PCollectionのサイズを確認する方法はありませんPCollectionはJava SDKなどの典型的なCollectionと似ていないため、その上にPTransform(Count.globally()やCombine.combineFn()など)を適用する必要はありません。

これは、データがそれに適用される操作(たとえば、PTransform)のためにコレクションに供給される、データの限定されたコレクションまたは無限のコレクションの抽象です。また、並列化されています(クラスの冒頭のPが示唆するように)。

したがって、各ワーカー/ノードから要素の数を取得し、それらを結合して値を取得するメカニズムが必要です。それが0かnかどうかは、その変換が終わるまで知ることができません。

1

あなたは使用しているSDKを指定していないので、私はPythonを想定しました。コードはJavaに簡単に移植可能です。

単純な比較を適用することで、要素のグローバルカウントを適用し、数値をブール値にマップできます。あなたは、サイド入力をするために、この値を次のように、pvalue.AsSingleton機能を使用できるようになります:側入力の

import apache_beam as beam 
from apache_beam import pvalue 

is_empty_check = (your_pcollection 
        | "Count" >> beam.combiners.Count.Globally() 
        | "Is empty?" >> beam.Map(lambda n: n == 0) 
        ) 

another_pipeline_branch = (
    p 
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check)) 
) 

使用方法は次のとおりです。

def do_something(element, is_empty): 
    if is_empty: 
     # yes 
    else: 
     # no 
関連する問題