同じアルゴリズムで処理する必要のある数百万行のPostgreSQLテーブルがあります。 私はこのタスクにPythonとSQLAlchemy.Coreを使用しています。PostgreSQLテーブルの分散処理
このアルゴリズムは、1つまたは複数の行を入力として受け入れ、更新された値を持つ同じ量の行を返します。
id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9
...
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz
私はこのタスクを実行するためにPCクラスタを使用しています。このクラスタはdask.distributed
スケジューラとワーカーを実行します。
このタスクは、map
機能で効果的に実装できると思います。私の考えは、各作業者がデータベースを照会し、NULL値でいくつかの行を処理することを選択し、結果を更新します。
私の質問は、どのように労働者の間でテーブルの部分を配布することができるSQLクエリを記述するのですか?
Iは各作業者が放出する、SQLクエリでoffset
とlimit
各ワーカーの行のサブセットを定義するために試みた:
SQL:
select * from table where value1 is NULL offset N limit 100;
...
update table where id1 = ... and id2 = ...
set value1 = value...;
パイソン:
from sqlalchemy import create_engine, bindparam, select, func
from distributed import Executor, progress
def process(offset, limit):
engine = create_engine(...)
# get next piece of work
query = select(...).where(...).limit(limit).offset(offset)
rows = engine.execute([select]).fetchall()
# process rows
# submit values to table
update_stmt = table.update().where(...).where(...).values(...)
up_values = ...
engine.execute(update_stmt, up_values)
if __name__ == '__main__':
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
n_rows = count_rows_to_process()
chunk_size = 100
progress(e.map(process, range(0, n_rows, chunk_size)))
ただし、これは機能しませんでした。
range
関数は、計算が開始される前にオフセットのリストを返し、map
関数は、process
関数を開始する前にワーカ間で分散しています。
一部の作業者は、作業量の処理を正常に完了し、結果をテーブルに提出し、値を更新しました。
新しい反復が開始され、新しいSELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ...
クエリがデータベースに送信されますが、オフセットは以前のワーカーがテーブルを更新する前に計算されたため無効になりました。 NULL値の量が削減され、ワーカーはデータベースから空のセットを受け取ることができます。
計算を開始する前に、SELECT
クエリを使用することはできません。これは、RAMに収まらない巨大なテーブルを返すためです。
SQLAlchemyのマニュアルでは、分散処理ではエンジンインスタンスを各Pythonプロセスごとにローカルに作成する必要があります。したがって、データベースに一度問い合わせることはできず、返されたカーソルをprocess
関数に送信することはできません。
したがって、解決方法はSQLクエリの正しい構築です。考慮すべき
あなたのアルゴリズムは分散処理の恩恵を受けるでしょうか?それはCPUにかなり依存していますか?非常に多くの場合、データセットの大部分(すべてではないにしても)をメモリに格納することができます(何百万行ものオーダーですら)。オーバーヘッドのために単一のプロセスを使用する場合より速く終わります。それにもかかわらず、 'OFFSET'行をスキップしなければならないために遅い' LIMIT'と 'OFFSET'を実行する代わりに、主キー('(id1、id2) 'と仮定)と'(id1 、id2)BETWEEN(1,2)AND(3、4) 'となります。 – univerio
私の場合には分散処理の代替手段はありません。このアルゴリズムはC言語で実装され、Pythonから呼び出されるPCとARM用のバイナリ実行ファイルでコンパイルされます。 – wl2776