2012-01-19 8 views
3

問題は、n個のプロセスで約20GBのサイズのファイルを同時に読み取ることです。ファイルには各行に1つの文字列が含まれ、文字列の長さは同じでも異なっていてもかまいません。文字列の長さは、たかだか10バイトまでです。cでファイルを並列読み書きする

私は16のノードを持つクラスターを持っています。各ノードはユニプロセッサーで、6GBのRAMを搭載しています。MPIを使用して並列コードを作成しています。

すべてのリソースを利用できるように、この大きなファイルを効率的に分割する方法を教えてください。

注:パーティションの制約は、ファイルを固定数の行のチャンクとして読み込むことです。 ファイルに1600行(たとえば、1600文字列)が含まれていると仮定します。最初のプロセスは1行目から100行目まで、2行目は101行目から200行目までのようにする必要があります。

複数のプロセスでファイルを読み取ることはできません1つの文字列だけを指すファイルハンドラが1つしかないため、時間がかかります。どのように他のプロセスが異なるチャンクから並行して読み込むことができますか?

答えて

0

パーティションに制約があるかどうかは指定しないので、何もないと仮定します。私はまた、パーティションを可能な限り同じサイズに近づけたいと考えています。

単純なアプローチは、ファイルをサイズ20GB/nのチャンクに分割することです。チャンクの開始位置はです。i*20GB/ni=0..n-1です。

問題は、もちろん、チャンク境界が入力ファイルの行間にあるという保証はありません。一般的に、彼らはしません。

幸いにも、これを修正する簡単な方法があります。上記のように境界線を設定したら、それぞれを(i=0を除く)次の改行の後に置くように少しずつシフトします。

これは、ファイルの15個の小さな断片を読み込むことになりますが、非常に均等なパーティションになります。

実際、ノードごとに個別に修正を行うことはできますが、説明を複雑にする価値はありません。

+0

ありがとうございました!実り多い答えとして、私は自分の制約を加えました。 – Gopal

+0

あなたの均等分配ロジックはとても良いです。しかし、一度に複数のプロセスでファイルを読み取ることはできません。ファイルハンドラが1つのみの文字列を指していると思います。他のプロセスはどのようにして異なるチャンクから並行して読み込むことができますか? – Gopal

0

私は、行の長さを取得し、行をプロセスに配布するコードを書く方が良いと思います。その分散関数は、文字列自体ではなく、その長さでのみ機能します。

固定サイズのソースを均等に分配するためのalgorythmを見つけることは問題ではありません。

その後、配布機能は、他のプロセスに作業のためにどのような部分が必要かを伝えます。プロセス0(ディストリビューター)は行を読み取ります。それはすでに知っている、その行番号。 1はプロセス1で処理する必要があります。... P.0は行番号numを読み込みます。 Nとはどのようなプロセスがそれを処理しなければならないかを知っています。

ああ!最初から配布を最適化する必要はありません。単にディストリビュータープロセスが入力から新しい行を読み込んで、それを自由なプロセスに渡すだけです。それで全部です。

したがって、大きく最適化された簡単なソリューションが2つあります。

ディストリビュータープロセスが未読の文字列を時々再最適化すれば、さらに最適化に達することができます。

+0

先生、私は文字列が可変長だと思います。そのような大きなファイルの文字列の長さを見つけて、それらをクラスタ全体に分散させるのは非常に複雑です。 – Gopal

+0

拝啓、配布機能は、文字列自身ではなく、その長さで機能すると思います。これの問題は何ですか? – Gangnus

+0

その後、配布機能は他のプロセスに彼らが仕事のためにどのような要素を必要としているかを伝えます。 – Gangnus

4

あなたが発見しているように、大量のデータを処理するにはテキストファイル形式が貧弱です。バイナリ形式よりも大きいだけでなく、ここ(改行のためのseaching)などの書式設定の問題に遭遇し、すべてが(データは文字列に変換される必要があります)です。テキストベースのフォーマットと数値データのバイナリフォーマットの間のIO速度は、10倍の差があります。しかし、今はあなたがテキストファイル形式に悩まされていると仮定します。

おそらく、あなたはスピードのためにこのパーティショニングを行っています。しかし、並列ファイルシステム、つまり複数のディスクからサービスを提供する複数のサーバーとそれらを調整できるFSがない限り、同じファイルから複数のMPIタスクを読み込むことから大幅な高速化が起きることはほとんどありません最終的には、これらの要求はいずれもサーバー/コントローラ/ディスクレベルでシリアル化されます。

さらに、大量のデータを読むことは、fseek()を改行したり、改行を探して小さな読み込みを行うよりもはるかに高速になります。

私の提案は、すべてのデータをできるだけ少数のチャンクで読み込み、関連する行を各タスク(最終的にはそれ自体を含む)に送ることです。ファイルが最初に何行あるか知っていれば、これはかなり簡単です。 2GBのデータを読み込み、N/Pth行の最後をメモリで検索し、それをタスク0に送信し、タスク0に「完了したデータ」メッセージを送り、続行します。

+0

このような価値ある洞察力を与えてくれてありがとう。私は自分のクラスタでNFSを設定しており、ファイルポインタを(myrank * N/Pth)行にオフセットすることによって大きなデータブロックを読み込んでいます。私はMPI_File_read(いくつかのパラメータ)関数を使用して、ファイルからこのたくさんのデータブロックを読み込みます。ここでは3番目の引数をバイト数として渡す必要があります。しかし、私の必要条件はN/Pth個のブロックを読み込むことです。行数はバイト数ではありません。それでどうやってこれから出ることができますか? – Gopal

+0

Sir解決策を私に教えてください。 – Gopal

+1

できません。そのため、テキストファイルは大量のデータに対してはあまり好ましくありません。ファイルを実際に検索することによって行がどこにあるか知ることができますが、これは非常に高価です。そのため、1つのプロセッサで全体の読み取りを行い、ファイルを配布することをお勧めします。レコード長が固定されたバイナリファイルを使用していた場合、MPI-IOのアプローチは良いでしょう(単一のNFSサーバではまだパフォーマンスに限界があります)。 –

0

ここでは、pythonでmpiとpypar拡張子を使用して、大きなファイル内の行数をmpiを使っていくつかのホスト間で分担する関数を示します。

def getFileLineCount(file1): 
    import pypar, mmap, os 
    """ 
    uses pypar and mpi to speed up counting lines 
    parameters: 
     file1 - the file name to count lines 
    returns: 
     (line count) 
    """ 

    p1 = open(file1, "r") 
    f1 = mmap.mmap(p1.fileno(), 0, None, mmap.ACCESS_READ) 

    #work out file size 
    fSize = os.stat(file1).st_size 
    #divide up to farm out line counting 
    chunk = (fSize/pypar.size()) + 1 

    lines = 0 
    #set start and end locations 
    seekStart = chunk * (pypar.rank()) 
    seekEnd = chunk * (pypar.rank() + 1) 
    if seekEnd > fSize: 
     seekEnd = fSize 

    #find start of next line after chunk 
    if pypar.rank() > 0: 
     f1.seek(seekStart) 
     l1 = f1.readline() 
     seekStart = f1.tell() 

    #tell previous rank my seek start to make their seek end 
    if pypar.rank() > 0: 
#  logging.info('Sending to %d, seek start %d' % (pypar.rank() - 1, seekStart)) 
     pypar.send(seekStart, pypar.rank() - 1) 
    if pypar.rank() < pypar.size() - 1: 
     seekEnd = pypar.receive(pypar.rank() + 1) 
#  logging.info('Receiving from %d, seek end %d' % (pypar.rank() + 1, seekEnd)) 

    f1.seek(seekStart) 

    logging.info('Calculating line lengths and positions from file byte %d to %d' % (seekStart, seekEnd)) 

    l1 = f1.readline() 
    prevLine = l1 

    while len(l1) > 0: 
     lines += 1 

     l1 = f1.readline() 
     if f1.tell() > seekEnd or len(l1) == 0: 
      break 

     prevLine = l1 
    #while 
    f1.close() 
    p1.close() 

    if pypar.rank() == 0: 
     logging.info('Receiving line info') 
     for p in range(1, pypar.size()): 
      lines += pypar.receive(p) 
    else: 
     logging.info('Sending my line info') 
     pypar.send(lines, 0) 

    lines = pypar.broadcast(lines) 
    return (lines) 
+0

さて、なぜですか? –

+0

そこにいたので。私は、各行のファイルposとその幅を記録する必要があるアプリケーションを持っています。私はそのコードを取り出し、この例の興味深い部分を削除しました。 – Martlark