2017-12-20 14 views
0

Google DataflowにApache Beam SDKを使用する一連のトランザクションから日次の残高を計算したいと考えています。目的の結果セットは次のようになりApache Beamを使用した合計の計算方法

John, 2017-12-01, 100 
John, 2017-12-01, 200 
Jane, 2017-12-01, 150 
John, 2017-12-02, -100 
John, 2017-12-02, 300 

例のデータセットは、受取人の名前、取引日付、金額でこのように見えるかもしれ

John, 2017-12-01, 300 (100 + 200) 
Jane, 2017-12-01, 150 
John, 2017-12-02, 500 (300 + -100 + 300) 

I BigDecimalの合計を計算したが、前日の期末残高を翌日の開始残高として考慮していないCombine.perKey関数を持つKV<Pair, BigDecimal>を使用してみました。

答えて

1

ビームのウィンドウAPIは、ここで使用する権利のことです。

https://beam.apache.org/documentation/programming-guide/#windowing

具体的には、あなたが質問に答える必要があります:

イベント時にあなたが集計を実行したいと思います
  1. 処理時間に回答がありますか?

これらの質問に答えるためにあなたのポストにはかなり十分な情報がありません - あなたはより多くの詳細を提供する必要があるだろう - あなたは、バッチまたはストリーミングモードで実行していますか?毎日の終わりに1つの回答が欲しいのですか、新しい取引があるたびに更新された合計が必要ですか?間に何か?推測しなければならないのは、グローバルな合計(グローバルイベント時間ウィンドウ)を維持し、現在の値で1日に1回更新を取得するように思えます。

我々は上記の質問に答えたら、私たちはいくつかの擬似コードを書くことができますが:

PCollection<KV<String, Double>> transactionsByName = .... ;  // Read input 

PCollection<KV<String, Double> dailyTotalsByName = transactionsByName 
    // Group by name 
    .apply(GroupByKey.<String, Double>create()) 
    // 1-day windows  
    .apply(Window.<KV<String, Iterable<Double>>>into( 
     FixedWindows.of(Duration.standardDays(1))))  
    // Combine each window (see combiners guide [here][1]) 
    .apply(Combine.<String, Iterable<Double>, Double>perKey(new SumTotals())); 

PCollection<KV<String, Double> globalTotalsByName = dailyTotalsByName 
    // Global windows allow you to combine a running total. Triggering condition 
    // specifies 'when' in processing time the answers are materialized. Here we 
    // have chosen to output the answer each time a new daily total arrives. 
    .apply(Window.<KV<String, Iterable<Double>>>into(new GlobalWindows())) 
    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) 
    // Combine daily totals 
    .apply(Combine.<String, Iterable<Double>, Double>perKey(new SumTotals())) 

上記のコードはそのまま正確に構築するが、少なくとも1つの合理的なアプローチの概要を説明しない場合があります。もちろん、入力や問題の詳細に応じて、トリガーの頻度などを調整する必要があるかもしれません。前述したように、これは毎日の終わりに結果を出すだけです。ライブ実行中の合計が必要な場合は、より複雑なトリガー条件を使用して現在の値を出力できます。

+0

お返事ありがとうございます。私が記述しているプロセスはバウンド・バッチ・プロセスです。これは、随時スケジュールでBigtableテーブルからデータを読み込み、日々の残高を計算し、別のBigtableテーブルに結果を書き込んでいます。ウィンドウ処理に関しては、コレクション要素のタイムスタンプに関連したものであり、任意のデータ要素ではないという印象を受けました。このプロセスは、過去の日付のデータ要素に対して今日処理されている可能性があります。 –

+0

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements に記載されているように、要素にタイムスタンプを手動で割り当てる必要があると思われるかもしれません。 SumTotals()クラス、具体的に拡張または実装するもの、およびそのシグネチャを拡張できますか?私はコンパイルするソリューションに着陸することはできません。 –

関連する問題