2015-11-16 20 views
5

ここでは、プログラミングモデルの周りに頭を抱えようとしています。シナリオ私はPub/Sub + Dataflowを使用してWebフォーラムのアナリティクスを計測しています。私は仕事がしたいGoogle Dataflowで長生きの状態

ID | TS | num_comments 
1 | 1 | 0 
1 | 2 | 1 
2 | 2 | 0 
1 | 4 | 2 

ID | TS | EventType 
1 | 1 | Create 
1 | 2 | Comment 
2 | 2 | Create 
1 | 4 | Comment 

そして、私がどのように見えるデータフローからの流れで終わるしたい:私は次のようになりパブ/サブから来るデータのストリームを持っていますこのロールアップはストリームプロセスとして実行され、新しいイベントが入ったときに新しいカウントが入力されます。私の質問は、ジョブが現在のトピックIDとコメントカウントの状態を保存するための慣用的な場所はどこですか?トピックは長年にわたって生きることができると仮定します。現在のアイデアは以下の通りです。

  • は、私がこれを書いたとしても私はありませんBigTableのにとトピックIDの現在のコメント数がで来ているものをDoFnクエリ内のトピックIDは「現在」エントリを書きますファン。
  • 何らかの方法でサイド入力を使用しますか?これは答えかもしれないようですが、もしそうなら私は完全に理解していません。
  • ストリーミングジョブをグローバルウィンドウで設定します。トリガーはレコードを取得するたびにオフになり、データフローを使用してペインの履歴全体をどこかに保持します。 (無制限のストレージ要件ですか?)

EDIT:これらの3つの戦略、またはそれを実行する他のさまざまな方法のいずれかを実装するのに問題はありません。 最高の Dataflowでこれを行う方法。バックフィルの履歴を再処理する必要があるもの

EDIT2:データフローサービスにバグがあり、現在、フラットトランスフォームに入力を追加すると更新が失敗するというバグがありますフラット操作に何かを追加することを含むジョブを変更した場合、そのジョブで発生した状態を破棄して再構築する必要があります。

答えて

7

これを実行するには、トリガーと結合を使用できるようにする必要があります。

PCollection<ID> comments = /* IDs from the source */; 
PCollection<KV<ID, Long>> commentCounts = comments 
    // Produce speculative results by triggering as data comes in. 
    // Note that this won't trigger after *every* element, but it will 
    // trigger relatively quickly (as the system divides incoming data 
    // into work units). You could also throttle this with something 
    // like: 
    // AfterProcessingTime.pastFirstElementInPane() 
    //  .plusDelayOf(Duration.standardMinutes(5)) 
    // which will produce output every 5 minutes 
    .apply(Window.triggering(
      Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
     .accumulatingFiredPanes()) 
    // Count the occurrences of each ID 
    .apply(Count.perElement()); 

// Produce an output String -- in your use case you'd want to produce 
// a row and write it to the appropriate source 
commentCounts.apply(new DoFn<KV<ID, Long>, String>() { 
    public void processElement(ProcessContext c) { 
    KV<ID, Long> element = c.element(); 
    // This includes details about the pane of the window being 
    // processed, and including a strictly increasing index of the 
    // number of panes that have been produced for the key.   
    PaneInfo pane = c.pane(); 
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
    } 
}); 

あなたのデータに応じて、また、ソースからの全体のコメントを読んIDを抽出し、各IDのカウントを取得するためにCount.perKey()を使用することができます。より複雑な組み合わせが必要な場合は、カスタムCombineFnを定義し、Combine.perKeyを使用して見ることができます。

+0

右のように、これは私の潜在的な実装のリストでは3番です。私の質問は、それはいい考えですか?ここの状態は、データフローによって暗黙的に維持されています。ジョブを再開する必要がある場合はどうなりますか?歴史的なバックフィルはどのように実装されますか? – bfabry

+1

変更内容に応じて、[既存のパイプラインを更新する](https://cloud.google.com/dataflow/pipelines/updating-a-pipeline)することができます。変更がより重要な場合は、すべての古いデータを読み取ることができるカスタムソースを使用している場合は、前述のアプローチが有効です。 –

+0

バックフィルなどを処理するためのカスタムソースは興味深いアイデアです。それはその問題を解決するようです。それは良い考えですか?それは永遠に成長するだけの状態ですか?フォーラムの話題が閉じられることができたら、「このIDのために私たちが心配しているイベントはもうなくなります」と言って、それが捨てられるようにする方法はありますか? – bfabry

2

のBigQueryが上書き行をサポートしていないので、これについて移動する一つの方法は、BigQueryのにイベントを書き込み、COUNTを使用してデータを照会することです:

SELECT ID、ID BY表GROUPからCOUNT(NUM_COMMENTS)。

BigQueryにエントリを書き込む前に、Dataflow内のnum_commentsのウィンドウごとの集計を行うこともできます。上記のクエリは引き続き機能します。

+0

ショットをくれてありがとう:-)問題の目的のために、宛先がBQであることを無視することができます。目的地は追加する必要があります。実生活における計算は単なる集計よりも複雑であり、複雑な(そして実行するのに高価な)BQクエリではなく、データフローにおけるETLで表現することを好む。また、この解決策は、トピックに対するコメントの数の時系列履歴を私たちに与えるものではありません。 – bfabry

関連する問題