2011-11-15 19 views
24

多次元配列ファイルからPostgreSQLデータベースに転送するには数千万の行があります。私のツールはPythonとpsycopg2です。データを一括してアップロードする最も効率的な方法はcopy_fromです。しかし、私のデータはほとんどが32ビットの浮動小数点数(実数または浮動小数点数)なので、実際の→テキスト→実数に変換するのではなく、ここでは例としてデータベースのDDLは次のとおりです。私は、文字列(テキスト)を使ってPythonででてるのはここpsycopg2でバイナリCOPYテーブルFROMを使用

CREATE TABLE num_data 
(
    id serial PRIMARY KEY NOT NULL, 
    node integer NOT NULL, 
    ts smallint NOT NULL, 
    val1 real, 
    val2 double precision 
); 

は次のとおりです。

# Just one row of data 
num_row = [23253, 342, -15.336734, 2494627.949375] 

import psycopg2 
# Python3: 
from io import StringIO 
# Python2, use: from cStringIO import StringIO 

conn = psycopg2.connect("dbname=mydb user=postgres") 
curs = conn.cursor() 

# Convert floating point numbers to text, write to COPY input 
cpy = StringIO() 
cpy.write('\t'.join([repr(x) for x in num_row]) + '\n') 

# Insert data; database converts text back to floating point numbers 
cpy.seek(0) 
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2')) 
conn.commit() 

はバイナリモードを使用して仕事ができる相当するものはありますか?つまり、浮動小数点数はバイナリで保持しますか?これによって浮動小数点精度が保持されるだけでなく、高速になる可能性があります。

(注:例と同じ精度を確認するために、SET extra_float_digits='2'を使用)

+0

まあ、あなたは[COPYでバイナリファイルをインポートする]ことができますが(http://www.postgresql.org/docs/9.1/interactive/sql-copy.html)、そのためにはファイル全体が必要です1つの値だけでなく、特定のバイナリ形式。 –

+0

@Erwin、はい私はCOPYのバイナリモードについて読みましたが、psycopg2でサポートされているかどうか、あるいは別の方法を使用するべきかどうかはわかりません。 –

+0

私が使用したバイナリファイル形式の唯一のアプリケーションは、* PostgreSQLからエクスポートされたファイルをインポートすることです。私は特定のフォーマットを書くことができる他のプログラムについては知らない。しかしどこかでそこに出ることができないというわけではありません。繰り返し操作の場合は、Postgresをテキスト形式で一度コピーし、バイナリファイルを書き出し、次にCOPY FROM .. FORMAT BINARYを書き込んでください。 –

答えて

29

ここでは、Python 3 FROM COPYのバイナリ等価である:

from io import BytesIO 
from struct import pack 
import psycopg2 

# Two rows of data; "id" is not in the upstream data source 
# Columns: node, ts, val1, val2 
data = [(23253, 342, -15.336734, 2494627.949375), 
     (23256, 348, 43.23524, 2494827.949375)] 

conn = psycopg2.connect("dbname=mydb user=postgres") 
curs = conn.cursor() 

# Determine starting value for sequence 
curs.execute("SELECT nextval('num_data_id_seq')") 
id_seq = curs.fetchone()[0] 

# Make a binary file object for COPY FROM 
cpy = BytesIO() 
# 11-byte signature, no flags, no header extension 
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0)) 

# Columns: id, node, ts, val1, val2 
# Zip: (column position, format, size) 
row_format = list(zip(range(-1, 4), 
         ('i', 'i', 'h', 'f', 'd'), 
         (4, 4, 2, 4, 8))) 
for row in data: 
    # Number of columns/fields (always 5) 
    cpy.write(pack('!h', 5)) 
    for col, fmt, size in row_format: 
     value = (id_seq if col == -1 else row[col]) 
     cpy.write(pack('!i' + fmt, size, value)) 
    id_seq += 1 # manually increment sequence outside of database 

# File trailer 
cpy.write(pack('!h', -1)) 

# Copy data to database 
cpy.seek(0) 
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy) 

# Update sequence on database 
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,)) 
conn.commit() 

更新

私はCOPYのためのファイルを書く上のアプローチを書き直しました。 Pythonの私のデータはNumPy配列になっているので、これらを使うのは理にかなっています。ここで1M行、7列を持つと、いくつかの例dataです:

CREATE TABLE num_data_binary 
(
    id integer PRIMARY KEY, 
    node integer NOT NULL, 
    ts smallint NOT NULL, 
    s0 real, 
    s1 real, 
    s2 real, 
    s3 real, 
    s4 real, 
    s5 real, 
    s6 real 
) WITH (OIDS=FALSE); 

num_data_textという名前の別な表:私のデータベースで

import psycopg2 
import numpy as np 
from struct import pack 
from io import BytesIO 
from datetime import datetime 

conn = psycopg2.connect("dbname=mydb user=postgres") 
curs = conn.cursor() 

# NumPy record array 
shape = (7, 2000, 500) 
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0])) 

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] + 
     [('s' + str(x), 'f4') for x in range(shape[0])]) 
data = np.empty(shape[1]*shape[2], dtype) 
data['id'] = np.arange(shape[1]*shape[2]) + 1 
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2]) 
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1]) 
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100 
prv = 's0' 
for nxt in data.dtype.names[4:]: 
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10 
    prv = nxt 

、私は2つのように見えるのテーブルを持っています。ここで

はnumpyのレコード配列内の情報を使用して、COPY(テキストとバイナリ形式の両方)のためのデータを準備するためにいくつかの簡単なヘルパー関数です:

def prepare_text(dat): 
    cpy = BytesIO() 
    for row in dat: 
     cpy.write('\t'.join([repr(x) for x in row]) + '\n') 
    return(cpy) 

def prepare_binary(dat): 
    pgcopy_dtype = [('num_fields','>i2')] 
    for field, dtype in dat.dtype.descr: 
     pgcopy_dtype += [(field + '_length', '>i4'), 
         (field, dtype.replace('<', '>'))] 
    pgcopy = np.empty(dat.shape, pgcopy_dtype) 
    pgcopy['num_fields'] = len(dat.dtype) 
    for i in range(len(dat.dtype)): 
     field = dat.dtype.names[i] 
     pgcopy[field + '_length'] = dat.dtype[i].alignment 
     pgcopy[field] = dat[field] 
    cpy = BytesIO() 
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0)) 
    cpy.write(pgcopy.tostring()) # all rows 
    cpy.write(pack('!h', -1)) # file trailer 
    return(cpy) 

この私がヘルパー関数を使用していますかベンチマーク2つのCOPYフォーマット方法:

def time_pgcopy(dat, table, binary): 
    print('Processing copy object for ' + table) 
    tstart = datetime.now() 
    if binary: 
     cpy = prepare_binary(dat) 
    else: # text 
     cpy = prepare_text(dat) 
    tendw = datetime.now() 
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' + 
      str(cpy.tell()) + ' bytes; transfering to database') 
    cpy.seek(0) 
    if binary: 
     curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy) 
    else: # text 
     curs.copy_from(cpy, table) 
    conn.commit() 
    tend = datetime.now() 
    print('Database copy time: ' + str(tend - tendw)) 
    print('  Total time: ' + str(tend - tstart)) 
    return 

time_pgcopy(data, 'num_data_text', binary=False) 
time_pgcopy(data, 'num_data_binary', binary=True) 

ここでは、最後の2つのtime_pgcopyコマンドからの出力です:

Processing copy object for num_data_text 
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database 
Database copy time: 0:00:37.929166 
     Total time: 0:01:53.217861 
Processing copy object for num_data_binary 
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database 
Database copy time: 0:00:23.325952 
     Total time: 0:00:24.622095 

したがって、NumPy→ファイルとファイル→データベースステップの両方が、バイナリアプローチでは高速です。明らかな違いは、PythonがCOPYファイルをどのように準備するのかです。一般に、バイナリ形式は、このスキーマのテキスト形式として、2/3の時間でデータベースに読み込まれます。

最後に、データベース内の両方のテーブルの値を比較して、数値が異なるかどうかを確認しました。行の約1.46%が列s0の値が異なり、この部分はs6の6.17%に増加します(おそらく、私が使用したランダムメソッドに関連しています)。すべての70M 32ビット浮動小数点値間のゼロでない絶対差は、9.3132257e-010と7.6293945e-006の間の範囲です。テキスト読み込みメソッドとバイナリ読み込みメソッドの小さな違いは、テキストフォーマットメソッドに必要なfloat→text→float変換の精度の低下によるものです。

+0

かなりクールです。あなたはそれをどこかから手に入れて書きましたか?それは実際にパフォーマンスを改善しますか? –

+0

うまくいけば、クール!フォーマットはさておき、解決策はpsycopgの 'copy_expert()'を使うことです。 – piro

+0

@Erwin、はい、私は[コピー](http://www.postgresql.org/docs/current/interactive/sql-copy.html)、[struct](http:// docs。 python.org/library/struct.html)と[dtype](http://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html)を参照してください。ベンチマークが入っており、見栄えがよくなっています。 –

1

Hereは私のバージョンです。マイクのバージョンに基づいています。

その非常にアドホックが、2つの長所があります。

  • hstoreバイナリ形式で記述する方法をreadline
  • 例に過負荷をかけることにより、ストリームとして発電機の役割を果たすことを期待しています。
関連する問題