Beamではスライディングウィンドウがネイティブにサポートされています。 SlidingWindowsクラスについてはprogramming guideとドキュメントをご覧ください。
例えば:
PCollection<Foo> foos = ...;
PCollection<Integer> counts = foos
.apply(Window.into(
SlidingWindows.of(Duration.standardMinutes(5))
.every(Duration.standardMinutes(1))))
// Below is required instead of Count.globally() when you use
// a non-global windowing function.
.apply(Combine.globally(Count.<Foo>combineFn()).withoutDefaults());
PCollection<String> formattedCounts = counts.apply(
ParDo.of(new DoFn<Integer, String>() {
@ProcessElement
public void process(ProcessContext c, BoundedWindow w) {
c.output("Window: " + w + ", count: " + c.element());
}
}));
トリガは、問題の別の次元であり、それは特定のウィンドウのデータが考慮されるとき、「十分に完了」の集約を適用するために制御します。 programming guideを参照してください。