2012-05-14 17 views
6

REF整合性ルールを使用して、いくつかの10kレコードをデータベースに挿入します。いくつかのデータ行は残念ながら重複しています(データベース内にすでに存在しています)。 SQLAlchemyによってスローされたIntegrityError例外を処理し、エラーを記録してから続行するつもりであるため、挿入する前にデータベースのすべての行が存在するかどうかをチェックするのは高価です。SQLAlchemy IntegrityErrorとバルクデータのインポート

# establish connection to db etc. 

tbl = obtain_binding_to_sqlalchemy_orm() 
datarows = load_rows_to_import() 

try: 
    conn.execute(tbl.insert(), datarows) 
except IntegrityError as ie: 
    # eat error and keep going 
except Exception as e: 
    # do something else 

私は上記作っています(暗黙の)仮定はSQLAlchemyの1つのトランザクションに複数の挿入をロールされていないことです。

私のコードは次のようになります。私の前提が間違っている場合は、IntegrityErrorが発生した場合、残りの挿入が中止されることを意味します。上記の擬似コード "パターン"が期待どおりに動作するかどうかを誰でも確認できますか?または、致命的なIntegrityError例外の結果としてデータを失うことになりますか?

また、誰かがこれを行うより良いアイデアを持っているなら、私はそれを聞くことに興味があります。

答えて

1

これは、以前のトランザクションを開始しなかった場合、この場合はsqlalchemyのautocommit featureが入りますが、リンクに記載されているとおりに明示的に設定する必要があります。

0

ASCIIデータファイルを解析してデータをテーブルにインポートするときにもこの問題が発生しました。問題は、私が直感的にも直感的にも、SQLAlchemyが重複した行をスキップして一意のデータを許可したいということです。または、現在のSQLエンジン(ユニコード文字列が許可されていないなど)のために、ランダムエラーが行にスローされることがあります。

ただし、この動作はSQLインターフェイスの定義の範囲外です。 SQL API、したがってSQLAlchemyはトランザクションとコミットのみを理解し、この選択的な動作を考慮しません。さらに、自動コミット機能に依存するのは危険です。挿入が例外後に停止し、残りのデータが残るためです。

私の解決策(最もエレガントなものかどうかはわかりません)は、ループ内のすべての行を処理し、例外をキャッチしてログに記録し、最後に変更をコミットすることです。

何らかの形でリストのリスト、つまり列の値のリストである行のリストでデータを取得したとします。次に、ループ内のすべての行を読み込みます。

# Python 3.5 
from sqlalchemy import Table, create_engine 
import logging 

# Create the engine 
# Create the table 
# Parse the data file and save data in `rows` 

conn = engine.connect() 
trans = conn.begin() # Disables autocommit 

exceptions = {} 
totalRows = 0 
importedRows = 0 

ins = table.insert() 

for currentRowIdx, cols in enumerate(rows): 
    try: 
     conn.execute(ins.values(cols)) # try to insert the column values 
     importedRows += 1 

    except Exception as e: 
     exc_name = type(e).__name__ # save the exception name 
     if not exc_name in exceptions: 
      exceptions[exc_name] = [] 
     exceptions[exc_name].append(currentRowIdx) 

    totalRows += 1 

for key, val in exceptions.items(): 
    logging.warning("%d out of %d lines were not imported due to %s."%(len(val), totalRows, key)) 

logging.info("%d rows were imported."%(importedRows)) 

trans.commit() # Commit at the very end 
conn.close() 

この操作の速度を最大限にするには、自動コミットを無効にする必要があります。私はSQLiteでこのコードを使用していますが、オートコミットが無効の場合でも、sqlite3だけを使用している古いバージョンよりも3〜5倍遅いです。 (私がSQLAlchemyに移植した理由は、MySQLでそれを使用できることでした)。

SQLiteへのダイレクトインターフェイスほど高速ではないという点で最も洗練されたソリューションではありません。私がコードをプロファイルして近い将来ボトルネックを見つけたら、この答えをソリューションで更新します。

関連する問題