2017-04-10 2 views
2

PostgreSQL DBのシンプルなテーブルに膨大な数のエントリ(〜600k)をアップロードしたいのですが、各エントリごとに1つの外部キー、タイムスタンプ、3つのフロートがあります。しかし、hereと記述されているコア一括挿入を実行するには、各エントリごとに60ミリ秒かかるので、実行には10時間かかります。私はexecutemany()メソッドのパフォーマンス上の問題であることを知りましたが、execute_values()メソッドでpsycopg2 2.7に解決されています。sqlalchemyでpsycopg2.extrasを使用するにはどうすればよいですか?

私は実行するコードは以下の通りです:

#build a huge list of dicts, one dict for each entry 
engine.execute(SimpleTable.__table__.insert(), 
       values) # around 600k dicts in a list 

私はしかし、私はSQLAlchemyの自分自身で解決策を見つけるために管理していない、それは共通の問題であることがわかります。 sqlalchemyに何らかの機会にexecute_values()を呼び出すよう指示する方法はありますか?自分でSQL文を作成せずに巨大な挿入を実装する方法はありますか?

ありがとうございました!

+1

私は 'SimpleTable .__ table __。insert()。values(values)'を使って、複数のVALUESタプルを持つ単一のINSERT文にコンパイルすることを提案しようとしていましたが、実際には、 。コンパイル自体は、 'executemany()'に依存するあなたのメソッドを使うのと同じくらい遅かったです。 –

答えて

1

これは、SQLAlchemyにpsycopgエキストラの使用を指示しようとする試みではなく、エンジンからの基本的なpsycopg接続にアクセスすることができます

import io 
import csv 
from psycopg2 import sql 

def bulk_copy(engine, table, values): 
    csv_file = io.StringIO() 
    headers = list(values[0].keys()) 
    writer = csv.DictWriter(csv_file, headers) 
    writer.writerows(values) 

    csv_file.seek(0) 

    # NOTE: `format()` here is *not* `str.format()`, but 
    # `SQL.format()`. Never use plain string formatting. 
    copy_stmt = sql.SQL("COPY {} (" + 
         ",".join(["{}"] * len(headers)) + 
         ") FROM STDIN CSV").\ 
     format(sql.Identifier(str(table.name)), 
       *(sql.Identifier(col) for col in headers)) 

    # Fetch a raw psycopg connection from the SQLAlchemy engine 
    conn = engine.raw_connection() 
    try: 
     with conn.cursor() as cur: 
      cur.copy_expert(copy_stmt, csv_file) 

    except Exception as e: 
     conn.rollback() 
     raise e 

    else: 
     conn.commit() 

    finally: 
     conn.close() 

、その後

bulk_copy(engine, SimpleTable.__table__, values) 

これは、INSERT文を実行に比べて十分高速である必要があります:COPY FROMを使用可能にraw_connection()、と。このマシンで600,000レコードを移動するには、約8秒かかった、約13μs/レコードでした。また、生の接続とカーソルをエキストラパッケージと併用することもできます。

関連する問題