2016-08-29 18 views
0

同じアルゴリズムで処理する必要のある数百万行のPostgreSQLテーブルがあります。 私はこのタスクにPythonとSQLAlchemy.Coreを使用しています。PostgreSQLテーブルの分散処理

このアルゴリズムは、1つまたは複数の行を入力として受け入れ、更新された値を持つ同じ量の行を返します。

id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3 
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6 
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9 
... 
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz 

私はこのタスクを実行するためにPCクラスタを使用しています。このクラスタはdask.distributedスケジューラとワーカーを実行します。

このタスクは、map機能で効果的に実装できると思います。私の考えは、各作業者がデータベースを照会し、NULL値でいくつかの行を処理することを選択し、結果を更新します。

私の質問は、どのように労働者の間でテーブルの部分を配布することができるSQLクエリを記述するのですか?

Iは各作業者が放出する、SQLクエリでoffsetlimit各ワーカーの行のサブセットを定義するために試みた:

SQL:

select * from table where value1 is NULL offset N limit 100; 
... 
update table where id1 = ... and id2 = ... 
     set value1 = value...; 

パイソン:

from sqlalchemy import create_engine, bindparam, select, func 
from distributed import Executor, progress 

def process(offset, limit): 
    engine = create_engine(...) 

    # get next piece of work 
    query = select(...).where(...).limit(limit).offset(offset) 

    rows = engine.execute([select]).fetchall() 
    # process rows 

    # submit values to table 
    update_stmt = table.update().where(...).where(...).values(...) 
    up_values = ... 
    engine.execute(update_stmt, up_values) 

if __name__ == '__main__': 
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'), 
             port=config('SERVER_PORT'))) 
    n_rows = count_rows_to_process() 
    chunk_size = 100 
    progress(e.map(process, range(0, n_rows, chunk_size))) 

ただし、これは機能しませんでした。

range関数は、計算が開始される前にオフセットのリストを返し、map関数は、process関数を開始する前にワーカ間で分散しています。

一部の作業者は、作業量の処理を正常に完了し、結果をテーブルに提出し、値を更新しました。

新しい反復が開始され、新しいSELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ...クエリがデータベースに送信されますが、オフセットは以前のワーカーがテーブルを更新する前に計算されたため無効になりました。 NULL値の量が削減され、ワー​​カーはデータベースから空のセットを受け取ることができます。

計算を開始する前に、SELECTクエリを使用することはできません。これは、RAMに収まらない巨大なテーブルを返すためです。

SQLAlchemyのマニュアルでは、分散処理ではエンジンインスタンスを各Pythonプロセスごとにローカルに作成する必要があります。したがって、データベースに一度問い合わせることはできず、返されたカーソルをprocess関数に送信することはできません。

したがって、解決方法はSQLクエリの正しい構築です。考慮すべき

+1

あなたのアルゴリズムは分散処理の恩恵を受けるでしょうか?それはCPUにかなり依存していますか?非常に多くの場合、データセットの大部分(すべてではないにしても)をメモリに格納することができます(何百万行ものオーダーですら)。オーバーヘッドのために単一のプロセスを使用する場合より速く終わります。それにもかかわらず、 'OFFSET'行をスキップしなければならないために遅い' LIMIT'と 'OFFSET'を実行する代わりに、主キー('(id1、id2) 'と仮定)と'(id1 、id2)BETWEEN(1,2)AND(3、4) 'となります。 – univerio

+0

私の場合には分散処理の代替手段はありません。このアルゴリズムはC言語で実装され、Pythonから呼び出されるPCとARM用のバイナリ実行ファイルでコンパイルされます。 – wl2776

答えて

1

1つのオプションはランダムです:最悪のシナリオでは

SELECT * 
FROM table 
WHERE value1 IS NULL 
ORDER BY random() 
LIMIT 100; 

あなたが並行して同じことを計算し、いくつかの労働者を持っています。それがあなたを気にしないなら、これは最も単純な方法の一つです。

他のオプションは、特定の作業者に個々の行を捧げている。

UPDATE table 
SET value1 = -9999 
WHERE id IN (
    SELECT id 
    FROM table 
    WHERE value1 IS NULL 
    ORDER BY random() 
    LIMIT 100 
) RETURNING * ; 

あなたは「マーク」の行は、あなたの特定の労働者が-9999で「とら」している。この道を。他のすべてのワーカーはvalue1がNULLではないため、これらの行をスキップします。ここでのリスクは、作業者が失敗した場合、単純にこれらの行に戻ることができないため、手動でNULLに戻す必要があるということです。

+0

あなたの2番目の選択肢が今朝私の心に浮かんだ – wl2776

関連する問題