2016-11-01 9 views
0

私はXサイズとY期間のスライディングタイムウィンドウを使用しています。各ウィンドウの出力をマークするために、PCollectionの現在のウィンドウのタイムスタンプを取得したいと思います。現在のスライドウィンドウの最大タイムスタンプを取得する方法

PCollection<T> windowedInput = input 
     .apply(Window<T>into(
      SlidingWindows.of(Duration.standardMinutes(10)) 
         .every(Duration.standardMinutes(1)))); 

    // Extract key from each input and run a function per group. 
    // 
    // Q: ExtractKey() depends on the window triggered time. 
    // How can I pass the timestamp of windowedInputs to ExtractKey()? 
    PCollection<KV<K, Iterable<T>>> groupedInputs = windowedInputs 
    .apply(ParDo.of(new ExtractKey())) 
    .apply(GroupByKey.<K, Ts>create()); 

    // Run Story clustering and write outputs. 
    // 
    // Q: Also I'd like to add a window timestamp suffix to the output. 
    // How can I pass (or get) the timestamp to SomeDoFn()? 
    PCollection<String> results = groupedInputs.apply(ParDo.of(new SomeDoFn())); 

答えて

2

DoFn@ProcessElement方法で任意BoundedWindowパラメータを介して現在の要素のウィンドウにアクセスすることを許可されている。提案するため

class SomeDoFn extends DoFn<KV<K, Iterable<T>>, String> { 
    @ProcessElement 
    public void process(ProcessContext c, BoundedWindow window) { 
    ... 
    } 
} 
+0

おかげ。試してみます。 ExtractKey()のコードスニペットにウィンドウ情報を提供する同様の方法はありますか? – user7101240

+0

もちろん、ExtractKeyのProcessElementメソッドにBoundedWindowパラメータを追加するだけです。 – jkff

関連する問題