2017-08-14 5 views
0

無制限のソースからデータフローパイプラインを読み取っています。私のウィンドウのサイズは10時間です、私はTestStreamを使って私のトリガをテストしようとしています。要素カウントがウィンドウ内の同じキーについて少なくとも2に達した場合、私のトリガーは早期の結果を出します。私はこれを達成するためのトリガーを次ていますクラウドデータフロー:一度トリガーが動作しない

input.apply(Window.into(FixedWindows.of(Duration.standardHours(12)))    .triggering(AfterWatermark.pastEndOfWindow() 
     .withEarlyFirings(AfterPane.elementCountAtLeast(2))) 
     .apply(Count.perElement()) 

を我々はまた、試してみました:

Repeatedly.forever(AfterPane.elementCountAtLeast(2)).orFinally(AfterWatermark.pastEndOfWindow()) 

結果をアサートするとき、私は早く発射を期待して、しかし私は

PAssert.that(pipeline).inWindow(..).. 
ですべての結果を得ることはありません

私は何が間違っていますか?また、同じテストを繰り返し実行すると、異なる結果が得られ、トリガーから異なる値が返されることを意味します。

答えて

2

トリガーは非決定的です。それはトリガー条件が満たされた後にしばらく時間をおいて発射します。トリガー条件が再び満たされた後、もう一度早期に発砲します。

トリガーの後に出す実際の選択は、ランナーによって決定されます。バッチランナーを使用している場合、すべてのデータが利用可能になるまで待機することがあります。各キー/ウィンドウの入力量はどれくらいですか?どのランナーを使用していますか?

+0

これはTestStreamのコンテキスト内にあります。それは今働いている。情報のおかげで。 – Mayumi

関連する問題