2011-09-27 2 views
13

私はハードディスク上のバイナリファイルに格納されている非常に大きなデータセットを持っています。ここでは、ファイル構造の例である:バイナリファイルからナンシーアレイを効率的に作成する方法

ファイルヘッダは

4 Byte Int - Record Timestamp 

サンプル開始

149 Byte ASCII Header 

録音スタート

2 Byte Int - Data Stream 1 Sample 
2 Byte Int - Data Stream 2 Sample 
2 Byte Int - Data Stream 3 Sample 
2 Byte Int - Data Stream 4 Sample 

サンプルエンド

レコードごとに122,880サンプル、ファイルごとに713レコードがあります。これにより、合計サイズは700,910,521バイトになります。サンプルレートとレコード数は時々異なりますので、ファイルごとにそれぞれの数を検出するようにコーディングする必要があります。

現在、私は配列にこのデータをインポートするために使用したコードは次のように動作します。

from time import clock 
from numpy import zeros , int16 , int32 , hstack , array , savez 
from struct import unpack 
from os.path import getsize 

start_time = clock() 
file_size = getsize(input_file) 

with open(input_file,'rb') as openfile: 
    input_data = openfile.read() 

header = input_data[:149] 
record_size = int(header[23:31]) 
number_of_records = (file_size - 149)/record_size 
sample_rate = ((record_size - 4)/4)/2 

time_series = zeros(0,dtype=int32) 
t_series = zeros(0,dtype=int16) 
x_series = zeros(0,dtype=int16) 
y_series = zeros(0,dtype=int16) 
z_series = zeros(0,dtype=int16) 

for record in xrange(number_of_records): 

    time_stamp = array(unpack('<l' , input_data[ 149 + (record * record_size) : 149 + (record * record_size) + 4 ]) , dtype = int32) 
    unpacked_record = unpack('<' + str(sample_rate * 4) + 'h' , input_data[ 149 + (record * record_size) + 4 : 149 + ((record + 1) * record_size) ]) 

    record_t = zeros(sample_rate , dtype=int16) 
    record_x = zeros(sample_rate , dtype=int16) 
    record_y = zeros(sample_rate , dtype=int16) 
    record_z = zeros(sample_rate , dtype=int16) 

    for sample in xrange(sample_rate): 

    record_t[sample] = unpacked_record[ (sample * 4) + 0 ] 
    record_x[sample] = unpacked_record[ (sample * 4) + 1 ] 
    record_y[sample] = unpacked_record[ (sample * 4) + 2 ] 
    record_z[sample] = unpacked_record[ (sample * 4) + 3 ] 

    time_series = hstack ((time_series , time_stamp)) 
    t_series = hstack ((t_series , record_t)) 
    x_series = hstack ((x_series , record_x)) 
    y_series = hstack ((y_series , record_y)) 
    z_series = hstack ((z_series , record_z)) 

savez(output_file, t=t_series , x=x_series ,y=y_series, z=z_series, time=time_series) 
end_time = clock() 
print 'Total Time',end_time - start_time,'seconds' 

これは、現在、私には非常に高いと思われる700メガバイトのファイル、あたり約250秒かかります。私はこれを行うより効率的な方法がありますか?上記元のコードより27X速い9秒にランタイムを切断DTYPEカスタムとnumpyのFROMFILE方法を用いて最終ソリューション

。最終的なコードは以下の通りです。

from numpy import savez, dtype , fromfile 
from os.path import getsize 
from time import clock 

start_time = clock() 
file_size = getsize(input_file) 

openfile = open(input_file,'rb') 
header = openfile.read(149) 
record_size = int(header[23:31]) 
number_of_records = (file_size - 149)/record_size 
sample_rate = ((record_size - 4)/4)/2 

record_dtype = dtype([ ('timestamp' , '<i4') , ('samples' , '<i2' , (sample_rate , 4)) ]) 

data = fromfile(openfile , dtype = record_dtype , count = number_of_records) 
time_series = data['timestamp'] 
t_series = data['samples'][:,:,0].ravel() 
x_series = data['samples'][:,:,1].ravel() 
y_series = data['samples'][:,:,2].ravel() 
z_series = data['samples'][:,:,3].ravel() 

savez(output_file, t=t_series , x=x_series ,y=y_series, z=z_series, fid=time_series) 

end_time = clock() 

print 'It took',end_time - start_time,'seconds' 

+0

は、それが医療データですか? EDF?私が何を話しているのかわからない場合は、気にしないでください...;)とにかく、私はこの質問に従って医療データバイナリファイルを開くために私の答えを見てください:http://stackoverflow.com/q/5804052/401828。そこには興味深い議論があります。 – heltonbiker

+0

データは地球物理学ではありません。私は投稿する前に調査中にあなたの質問を見ました。あなたのデータは短いものだけで構成されています。残念ながら、ストリーム全体に4バイトのintタイムスタンプが散在しています。 – Stu

+0

何の価値があるのか​​、numpyの構造化配列の多くの操作は、通常のnumpy配列よりもずっと遅いです。インポート時間は速くなるかもしれませんが、計算に10-100倍かかることがあります:( –

答えて

13

いくつかのヒント:

このような何か(テストされていないが、あなたのアイデアを得る):

 
import numpy as np 

file = open(input_file, 'rb') 
header = file.read(149) 

# ... parse the header as you did ... 

record_dtype = np.dtype([ 
    ('timestamp', '<i4'), 
    ('samples', '<i2', (sample_rate, 4)) 
]) 

data = np.fromfile(file, dtype=record_dtype, count=number_of_records) 
# NB: count can be omitted -- it just reads the whole file then 

time_series = data['timestamp'] 
t_series = data['samples'][:,:,0].ravel() 
x_series = data['samples'][:,:,1].ravel() 
y_series = data['samples'][:,:,2].ravel() 
z_series = data['samples'][:,:,3].ravel() 
+0

savezを含む合計時間は9.07秒です!ありがとうございました。質問を最終的なコードで更新します。 – Stu

+0

偉大な答え、numpy内蔵機能の優れた使用! +1 – heltonbiker

+0

ヘッダーがあるかどうか、どのくらい知っていますか? – mmann1123

2

numpyのは、numpy.memmapを介してオブジェクトのような配列に直接データからマッピングバイナリをサポートします。ファイルをマップして、必要なデータをオフセットで抽出することができます。

エンディアン正確性については

だけであなたは、ホストシステムのエンディアンをチェックするための条件式を使用することができますあなたが読んでいるものにnumpy.byteswap使用します。

if struct.pack('=f', np.pi) == struct.pack('>f', np.pi): 
    # Host is big-endian, in-place conversion 
    arrayName.byteswap(True) 
+0

私はそれを見ましたが、データのエンディアンを指定する方法がないようです。 unixなので、エンディアンを明示的に指定する必要があります。 – Stu

+0

[numpy.byteswap](http://docs.scipy.org/doc/numpy/reference/generated/numpy.memmap.html)を使用して、正しいデータのエンディアンを必要に応じて変更してください。 – talonmies

+0

関数 'numpy.memmap'の+1は非常に便利です。 – heltonbiker

2

非効率性を睨ん一つは中hstackの使用でありますループ:

time_series = hstack ((time_series , time_stamp)) 
    t_series = hstack ((t_series , record_t)) 
    x_series = hstack ((x_series , record_x)) 
    y_series = hstack ((y_series , record_y)) 
    z_series = hstack ((z_series , record_z)) 

各繰り返しごとに、これはそれぞれの系列に対してわずかに大きな配列を割り当て、それまでに読み込まれたすべてのデータをコピーします。これにはロットの不要なコピーが含まれ、メモリの断片化が悪化する可能性があります。

は私がしたい、リストにtime_stampの値を蓄積し、最後に1 hstackを行い、そしてそれは、十分なパフォーマンスの向上を持参していない場合などrecord_t

ために、正確に同じことをするだろうと思いますループの本体をコメントアウトして、時間を費やしている場所を確認するために、一度に物事を戻し始めるでしょう。

+0

これをうまく実装すると、時間が110秒に短縮されました。 – Stu

+0

また、110秒のうち、約40は、私が最適化できないsavez関数用です。比較のために、.npzを読み込むのに20秒しかかかりません。 – Stu

+0

私はbeeを持っていなければなりませんn savezについては、カスタムdtypeを使用してfromfileメソッドを使用するとsavezを含む時間が9秒に短縮されます。 – Stu

0

arraystruct.unpackを使用すると、同様の問題(マルチ解像度のマルチチャネルバイナリデータファイル)で満足のいく結果が得られました。私の問題では、各チャンネルの連続データが必要でしたが、ファイルはチャンネル指向の構造ではなく、間隔指向の構造を持っていました。

f = open(somefilename, 'rb')  
fullsamples = array('h') 
fullsamples.fromfile(f, os.path.getsize(wholefilename)/2 - f.tell()) 
position = 0 
for rec in xrange(int(self.header['nrecs'])): 
    for channel in self.channel_labels: 
     samples = int(self.channel_content[channel]['nsamples']) 
     self.channel_content[channel]['recording'].extend(fullsamples[position:position+samples]) 
      position += samples 

「秘密」は最初のファイル全体を読み取り、そしてだけにして、所望の容器に知られたサイズのスライスを分配することである(下記のコードでは、self.channel_content[channel]['recording']arrayの目的です)

もちろん、これは他の回答よりも優れているとは言えませんが、少なくともあなたが評価できるものです。

希望すると助かります!