2012-01-25 12 views
2

私は、multiprocessing.pyを使用して巨大なテキストファイルをフィルタリングしています。コードは基本的にテキストファイルを開き、それを処理して閉じます。逐次マルチプロセッシング

これは、複数のテキストファイルで連続して起動できるようにすることです。したがって、私はループを追加しようとしましたが、何らかの理由でそれが動作しません(コードは各ファイルで機能します)。これは問題です:

if __name__ == '__main__':  

しかし、私は何か他のものを探しています。私はこのようなランチャーとLauncherCountファイルを作成しようとしました:

LauncherCount.py: 

    def setLauncherCount(n): 
     global LauncherCount 
     LauncherCount = n 

と、

Launcher.py: 

import os 
import LauncherCount 

LauncherCount.setLauncherCount(0) 

os.system("OrientedFilterNoLoop.py") 

LauncherCount.setLauncherCount(1) 

os.system("OrientedFilterNoLoop.py") 

... 

私はLauncherCount.pyをインポートして、私のループインデックスとしてLauncherCount.LauncherCount使用します。

もちろん、これは変数LauncherCount.LauncherCountをローカルで編集するので機能しませんので、インポートされたLauncherCountのバージョンでは編集されません。

インポートされたファイル内の変数をグローバルに編集する方法はありますか?あるいは、他の方法でこれを行う方法はありますか?私が必要とするのは、コードを複数回実行し、1つの値を変更して、明らかにループを使用しないことです。

ありがとうございます!

編集:必要に応じてここに私のメインコードがあります。悪いスタイルのため申し訳ありません...

import multiprocessing 
import config 
import time 
import LauncherCount 

class Filter: 

    """ Filtering methods """ 
    def __init__(self): 
     print("launching methods") 

     # Return the list: [Latitude,Longitude] (elements are floating point numbers) 
    def LatLong(self,line): 

     comaCount = [] 
     comaCount.append(line.find(',')) 
     comaCount.append(line.find(',',comaCount[0] + 1)) 
    comaCount.append(line.find(',',comaCount[1] + 1)) 
    Lat = line[comaCount[0] + 1 : comaCount[1]] 
    Long = line[comaCount[1] + 1 : comaCount[2]] 

    try: 
     return [float(Lat) , float(Long)] 
    except ValueError: 
     return [0,0] 

# Return a boolean: 
# - True if the Lat/Long is within the Lat/Long rectangle defined by: 
#   tupleFilter = (minLat,maxLat,minLong,maxLong) 
# - False if not                 
def LatLongFilter(self,LatLongList , tupleFilter) : 
    if tupleFilter[0] <= LatLongList[0] <= tupleFilter[1] and 
     tupleFilter[2] <= LatLongList[1] <= tupleFilter[3]: 
     return True 
    else: 
     return False 

def writeLine(self,key,line): 
    filterDico[key][1].write(line) 



def filteringProcess(dico): 

    myFilter = Filter() 

    while True: 
     try: 
      currentLine = readFile.readline() 
     except ValueError: 
      break 
     if len(currentLine) ==0:     # Breaks at the end of the file 
      break 
     if len(currentLine) < 35:     # Deletes wrong lines (too short) 
      continue 
     LatLongList = myFilter.LatLong(currentLine) 
     for key in dico: 
      if myFilter.LatLongFilter(LatLongList,dico[key][0]): 
       myFilter.writeLine(key,currentLine) 


########################################################################### 
       # Main 
########################################################################### 

# Open read files: 
readFile = open(config.readFileList[LauncherCount.LauncherCount][1], 'r') 

# Generate writing files: 
pathDico = {} 
filterDico = config.filterDico 

# Create outputs 
for key in filterDico: 
    output_Name = config.readFileList[LauncherCount.LauncherCount][0][:-4] 
        + '_' + key +'.log' 
    pathDico[output_Name] = config.writingFolder + output_Name 
    filterDico[key] = [filterDico[key],open(pathDico[output_Name],'w')] 


p = [] 
CPUCount = multiprocessing.cpu_count() 
CPURange = range(CPUCount) 

startingTime = time.localtime() 

if __name__ == '__main__': 
    ### Create and start processes: 
    for i in CPURange: 
     p.append(multiprocessing.Process(target = filteringProcess , 
              args = (filterDico,))) 
     p[i].start() 

    ### Kill processes: 
    while True: 
     if [p[i].is_alive() for i in CPURange] == [False for i in CPURange]: 
      readFile.close() 
      for key in config.filterDico: 
       config.filterDico[key][1].close() 
       print(key,"is Done!") 
       endTime = time.localtime() 
      break 

    print("Process started at:",startingTime) 
    print("And ended at:",endTime) 
+0

"これは、複数のテキストファイルで連続して起動できるようにすることです。"これは待ち行列のためのものと思われる。なぜこのためにキューを使用していないのですか? –

+0

入手したら、プロセス間で値と情報を交換するためにキューが使用されますか? 私がやりたいことは、プロセスを拡張して連続したファイルを処理できるようにするのではなく、プロセスが完了するまで待つことと、新しい入力ファイルで同じメソッドを持つ新しいプロセスを作成することです。 – kevad

+1

これは逆のようです。なぜ、ファイル名を待っているすべての読み込みキューがプロセスの束になっていないのでしょうか。 1つのプロセスが終了し、ファイル名が次のプロセスのキューに入れられます。そうすれば、同期は簡単です。キューから名前を読み込みます。仕事する;名前を別のキューに書き込みます。なぜあなたはそれをやっていないのですか? –

答えて

1

並行してグループ内のファイルで作業しながら、順番にファイルのグループを処理するには:

#!/usr/bin/env python 
from multiprocessing import Pool 

def work_on(args): 
    """Process a single file.""" 
    i, filename = args 
    print("working on %s" % (filename,)) 
    return i 

def files(): 
    """Generate input filenames to work on.""" 
    #NOTE: you could read the file list from a file, get it using glob.glob, etc 
    yield "inputfile1" 
    yield "inputfile2" 

def process_files(pool, filenames): 
    """Process filenames using pool of processes. 

    Wait for results. 
    """ 
    for result in pool.imap_unordered(work_on, enumerate(filenames)): 
     #NOTE: in general the files won't be processed in the original order 
     print(result) 

def main(): 
    p = Pool() 

    # to do "successive" multiprocessing 
    for filenames in [files(), ['other', 'bunch', 'of', 'files']]: 
     process_files(p, filenames) 

if __name__=="__main__": 
    main() 

前のものは持っていた後、それぞれのprocess_file()が順番に呼び出されすなわち、process_files()への異なる呼び出しからのファイルは、ではなく、が並列処理されています。

+0

私は各入力ファイルに同じ出力ファイルを使用しないので、基本的にファイルが終了するたびに、すべての出力ファイルを閉じ、新しいファイルを作成し、新しい入力と新しい出力で新しい一連のプロセスを起動しますどのように私はあなたのソリューションでこれを行うことができますか? – kevad

+1

@ user1154967:入力ファイル名に基づいて出力ファイル名を生成します。たとえば、 'output_filename = filename + '。output'' – jfs

+0

これは並列マルチプロセッシングを作成しますが、私は連続マルチプロセッシングを行う方法を探しています。ファイルとキャッシュメモリは入力ファイルが約35GBなので、DBサーバーから読み込んでいます。 – kevad

関連する問題