2017-06-27 6 views
1

Google Dataflow/Beamのlead/lag関数のようなSQLを実行する方法を探しています。 SQLで行われていれば、私の場合、それはDataflow(Beam)でSQLのリード/ラグ関数を実行できるトランスフォームの作成方法

ビーム内
lead(balance, 1) over(partition by orderId order by order_Date) 

ようになり、我々は、入力されたテキストファイルを解析し、データを保持するクラスClient_Ordersを作成します。簡単にするため、このクラスのメンバーはorderIdorder_Dateおよびbalanceとしましょう。そして、我々はPCollectionsビームにおける

PCollection <KV<String, Iterable<Client_Orders>>> mainCollection = pipeline.apply(TextIO.Read.named("Reading input file") 
.from(options.getInputFilePath())) 
.apply(ParDo.named("Extracting client order terms from file") // to produce Client_Orders object 
    .apply('create KV...", GroupByKey.<String, Client_Orders>create()); 

にKVを構築することによりorderIdでパーティションを作成し、私たちは、ウィンドウを行うことができます知っているが、それは期間Windows.of(Duration.standardDays(n))の面でウィンドウサイズを設定するには、一般的には必要ですが、それはしませんこの場合、私はorder_Dateを使用してPCollectionを繰り返す必要がありますか?

答えて

1

メモリ内でソートするためにデータがキーごとに大きすぎる場合は、ビーム"sorter" extensionが必要です。

私は説明する:

ビーム(したがって、データフロー)でPCollectionの要素は順不同です。これにより、同じデータがリアルタイムストリームとして到着した場合でも、格納されたファイルから読み込まれた場合でも、同じ出力が得られる統一プログラミングモデルがサポートされます。また、孤立した障害回復をサポートし、ネットワークの遅延に対する堅牢性を提供します。

大規模なデータ処理の長年にわたって、グローバルオーダーのほとんどすべての用途は有用ではないことが判明しました。スケーラビリティは、目標を達成するための別の方法を見つける)。また、グローバル秩序が存在しても、処理が順番に行われるわけではないので(並列であるため)、グローバル秩序はほとんど直ちに失われます。したがって、グローバル秩序はロードマップ上にありません。

ただし、必要な注文の種類はキーごとです。これは一般的で有用であり、しばしば「値の並べ替え」として知られています。 GroupByKey操作によって、キー(グループKV<K, Iterable<V>>の要素)のグループ化された値が得られる場合、ユーザー定義の値の順序はになることがよくあります。単一の要素内でをソートしているため、は、要素がパイプラインを通過する際に順序が保持されます。また、値をソートするのは必ずしも費用がかかりません。キーによってグループ化されたものと同じ操作で、グループ化されている値を並べ替えることもできます。これはビームロードマップにありますが、ビームモデルの一部ではありません。

今のところ、値をソートできる上記のJavaベースの拡張機能があります。

+0

ありがとう、@ケン、私は試してみます。後で更新します。 – bignano

+0

それはあまりにも大きくない場合、どのようにメモリ内のキーごとのソートを行うのだろうか? – bignano

+0

こんにちは@Kenn、私はこの 'GroupByKey.SortValuesByTimestamp 'を見つけました。これが私が必要とするものであれば、私はorder_Dateで注文するつもりですか?ありがとう – bignano

関連する問題