2017-01-26 136 views
1

私は現在SQLAlchemy(GoogleのクラウドMySQLに接続しているGAE上)を使用し、テーブルの一括更新を行う必要があるWebアプリケーション(Flask)を作成しています。要するに、多くの計算が行われ、結果として1000のオブジェクトで更新する必要がある単一の値になります。現時点では、トランザクションですべてをやっていますが、最後には、フラッシュ/コミットに時間がかかります。SQLAlchemy一括更新の戦略

テーブルのインデックスはidであり、これはすべて1回のトランザクションで実行されます。だから、私はいつもの間違いを避けたと信じていますが、それはまだ非常に遅いです。

INFO  2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s 
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656} 

私の理解から、そこに実際にSQLでの一括更新を実行する方法はありませんし、上記のステートメントがサーバーに送信される複数のUPDATEステートメントなってしまいます。

私はSession.bulk_update_mappings()を使用しようとしましたが、実際に何もしていないようです:(理由はわかりませんが、実際には更新は実際には起こりません。パフォーマンススイート)を使用しようとしているかどうかは分かりません。

One technique I've seen discussed別のテーブルに一括挿入してからUPDATE JOINを実行しています。以下のようなテストを行いました。

wallets = db_session.query(Wallet).all() 
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ] 
db_session.bulk_save_objects(ledgers) 
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount') 
db_session.execute('TRUNCATE ledger') 

しかし、問題は私のコードをどのように構造化するかです。私はORMを使用しています。私はどうにかして元のWalletオブジェクトを「汚い」ことなく、古い方法でコミットしないようにする必要があります。私はちょうどこれらのLedgerオブジェクトを代わりに作成し、それらのリストを保持して、手動で一括操作の最後にそれらを挿入することができます。しかし、ORMの仕組みの一部を複製しているような臭いはほとんどありません。

これを行うにはスマートな方法がありますか?これまでのところ、私の脳が何か下がっている:私が言ったように

class Wallet(Base): 
    ... 
    _balance = Column(Float) 
    ... 

@property 
def balance(self): 
    # first check if we have a ledger of the same id 
    # and return the amount in that, otherwise... 
    return self._balance 

@balance.setter 
def balance(self, amount): 
    l = Ledger(id=self.id, amount=amount) 
    # add l to a list somewhere then process later 

# At the end of the transaction, do a bulk insert of Ledgers 
# and then do an UPDATE JOIN and TRUNCATE 

を、このすべては、私が持っている(こと)ツールと戦っているようです。これを処理するより良い方法はありますか?これを行うには、ORMメカニズムを利用できますか?あるいは、一括更新を行うためのより良い方法がありますか?

EDIT:または、イベントやセッションで賢明なことがありますか?たぶんbefore_flush?

EDIT 2:だから私は今イベントの機械を活用し、しようとしているが、これを持っている:

私にはかなりハックと悪のようだが、正常に動作するように見える
@event.listens_for(SignallingSession, 'before_flush') 
def before_flush(session, flush_context, instances): 
    ledgers = [] 

    if session.dirty: 
     for elem in session.dirty: 
      if (session.is_modified(elem, include_collections=False)): 
       if isinstance(elem, Wallet): 
        session.expunge(elem) 
        ledgers.append(Ledger(id=elem.id, amount=elem.balance)) 

    if ledgers: 
     session.bulk_save_objects(ledgers) 
     session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount') 
     session.execute('TRUNCATE ledger') 

。どんな落とし穴、またはより良いアプローチ?

-Matt

答えて

2

あなたは、本質的にやっていることは、パフォーマンスを最適化するために、ORMをバイパスしています。したがって、ORMがやっている作業を複製することに驚かないでください。なぜなら、これはあなたがする必要があることなのです。

このような一括アップデートが必要な場所がたくさんある場合を除いて、私は魔法のようなイベントのアプローチにはお勧めできません。明示的なクエリを書くだけではるかに簡単です。私は、更新を行うにはSQLAlchemyのコアの代わりに、ORMを使用していることをお勧めします何

ledger = Table("ledger", db.metadata, 
    Column("wallet_id", Integer, primary_key=True), 
    Column("new_balance", Float), 
    prefixes=["TEMPORARY"], 
) 


wallets = db_session.query(Wallet).all() 

# figure out new balances 
balance_map = {} 
for w in wallets: 
    balance_map[w.id] = calculate_new_balance(w) 

# create temp table with balances we need to update 
ledger.create(bind=db.session.get_bind()) 

# insert update data 
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v} 
              for k, v in balance_map.items()]) 

# perform update 
db.session.execute(Wallet.__table__ 
         .update() 
         .values(balance=ledger.c.new_balance) 
         .where(Wallet.__table__.c.id == ledger.c.wallet_id)) 

# drop temp table 
ledger.drop(bind=db.session.get_bind()) 

# commit changes 
db.session.commit() 
+0

はい、私はORMとの戦いがあまりにも多すぎると思います。問題は、私はすでにこれのほとんどのためにORMを使用しているので、私は上記のフォームの何かを行うことができるようにそれを得るためにかなりの書き直しが必要になります。 'calculate_new_balance()'は以前の値を計算したものです(このユースケースはネットワークを介して資金を送り出しています)ので、モデルと元帳の両方を調べてみる必要がありますそれは正しい値を持っていた。しかし、ありがとう、これは確かに私に考えのための食糧を与えた! –

0

は、一般的には、頻繁に数千行を更新する必要があるために貧しいスキーマ設計です。それはさておき...

プランA:

CREATE TEMPORARY TABLE ToDo (
    id ..., 
    new_balance ... 
); 
INSERT INTO ToDo -- either one row at a time, or a bulk insert 
UPDATE wallet 
    JOIN ToDo USING(id) 
    SET wallet.balance = ToDo.new_balance; -- bulk update 

を生成ORMのコードを書く(構文を確認し、テスト;など)

START TRANSACTION; 
UPDATE wallet SET balance = ... WHERE id = ...; 
UPDATE wallet SET balance = ... WHERE id = ...; 
UPDATE wallet SET balance = ... WHERE id = ...; 
... 
COMMIT; 

プランBを生成書くORMコード

+0

ありがとう:)私はそれがスキーマ設計が貧弱かもしれないことは知っていますが、問題の事実は何とかこのプログラムの各繰り返しで行の1000を更新する必要があるということです。プランAは私が元々持っていたものでしたが、遅すぎました。上記のuniverioのコードと 'before_flush'リスナの組み合わせに基づいて、今やPlan Bが私の最終的なものになりました。いい加減なことは、リスナーを無効にして、デフォルトの動作(プランA)を持っていて、最適化されたアップデートを得る(プランB) –

+0

プランB(あなたの元帳)に関する注意 - 更新が完了し、あなたの 'TRUNATE'は、あなたがエントリを失う可能性があります。あなたのコードをSQLに翻訳できれば、私は詳しく述べることができます。 –