0

私は現在、BigQueryからデータを読み込み、最後に別のBigQueryタスクに書き戻すPython APIを使用して、より大きなApache Beamパイプラインを作成しています。大規模なオーバーヘッドを伴うApache Beamでのバッチ処理

変換の1つは、データを変換するためにバイナリプログラムを使用する必要があり、そのためにバイナリ検索データを含む23GBファイルをロードする必要があります。したがって、プログラムの起動と実行にはオーバーヘッドがかかり(毎回ロード/実行するのに約2分かかります)、RAMが必要となり、単一のレコードで起動するのは意味がありません。さらに、毎回23GBのファイルをCloud Storageからローカルにコピーする必要があります。

バイナリのためのワークフローは次のようになります。

  1. コピー23ギガバイトのファイルクラウドストレージからファイルにすでに

  2. 保存レコードが存在しない場合

  3. 呼び出しでバイナリを実行します()

  4. バイナリの出力を読み取り、返す

プログラムが一度に処理できるレコードの量は基本的に無制限なので、一度に処理するレコードの数を指定することができるように、幾分分散されたビーム変換を取得するといいでしょう。一度に100,000個)、それでも複数のノード上で一度に100,000個のレコードを実行できるように分散しています。

この動作をサポートするBeam​​は表示されません。いくつかの分割基準/キーに基づいてレコードを収集し、蓄積されたレコードに対してmerge_accumulatorsステップでバイナリを実行するKeyedCombineFn操作として何かを一緒にハッキングする可能性があります。しかし、これは私にとって非常にハッキリですね。

または、GroupByKeyとプロセスグループをバッチとして使用できますか?これは、各グループが一度に処理されることを保証するか、またはグループがビームによってシーンの後ろに分割されることを保証するか?

また、Java APIにはGroupIntoBatchesがありますが、これは私が必要と思うように聞こえますが、私が知る限りPython SDKでは利用できません。

私の2つの質問は、Apache Beamでこのユースケースを達成するための最良の方法(パフォーマンス上の措置)であり、良い解決策がない場合は、他のGoogle CloudサービスBeam --> Other Service --> Beamのように使用できますか?

答えて

1

グループはシーンの裏で分割できないので、GroupByKeyを使用すると効果があります。実際、個々の要素を1つのマシン上で処理しなければならず、GroupByKeyの後にあるキーを持つすべての値が同じ要素の一部であるため、これは要件です。

ランダムキーを割り当てることをお勧めします。指定されたキーで値が多すぎる場合は、それらの値をすべてプログラムに渡すのが難しいかもしれないので、一度にプログラムに渡す値の数を制限することもできますおよび/またはキーの割り当て方法を調整します。

乱数を割り当てるトリックの1つは、スタートバンドル(例えば1から1000)で乱数を生成し、それからプロセス要素でこれを増やして1001を1000にラップすることです。これにより、すべての要素に対して乱数を生成することが回避されます。キーの適切な配布を保証します。

このロジックの両方に対応するように、(同様の状況では、を分割して再利用可能な)PTransformを作成することができます。

+0

ヘッドアップありがとう。それを試してみて、それは本当に私が必要とするものを行う。 – Zenon

関連する問題