2011-09-26 10 views
18

私が作った別の記事と同様に、これはその投稿に答えて新しい質問を作成します。DB接続を作成し、複数のプロセス(マルチプロセッシング)を維持する

要約:ポリゴンのデータセットをオーバーレイするポイントのデータセットを持つ空間データベースのすべてのレコードを更新する必要があります。それぞれのポイントフィーチャに対して、そのフィーチャがその中にあるポリゴンフィーチャに関連付けるキーを割り当てたいと思います。だから私のポイント 'ニューヨークシティ'がポリゴンUSA内にあり、アメリカポリゴン 'GID = 1'の場合は、ニューヨークのポイントに 'gid_fkey = 1'を割り当てます。

これはマルチプロセッシングを使って実現しました。私はこれを使用して速度が150%向上したことに気付きました。しかし、私は、レコードごとに1つのDB接続が必要なので、必要以上のオーバーヘッドがあると思います。だからここ

はコードです:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task() 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self):   
     pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     pyConn.set_isolation_level(0) 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
    for w in consumers: 
     w.start() 

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConnX.set_isolation_level(0) 
    pyCursorX = pyConnX.cursor() 

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
    temp = pyCursorX.fetchall()  
    num_job = temp[0] 
    num_jobs = num_job[0] 

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
    cityIdListTuple = pyCursorX.fetchall()  

    cityIdListList = [] 

    for x in cityIdListTuple: 
     cityIdList.append(x[0]) 


    for i in xrange(num_jobs): 
     tasks.put(Task(cityIdList[i - 1])) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    while num_jobs: 
     result = results.get() 
     print result 
     num_jobs -= 1 

それは私が「時間」モジュールでそれを測定する必要があり、接続あたり0.3と1.5秒の間になりそうです。

プロセスごとにDB接続を作成し、city_id情報を変数として使用して、このオープンでカーソルのクエリを入力できる方法はありますか?このようにして、DB接続でそれぞれ4つのプロセスを作成し、処理するために何らかの形でcity_idをドロップします。

答えて

31

は、実行タスクにそれを与える、消費者コンストラクタでの接続の作成を明確にしてください:御馳走を働いた

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     self.pyConn.set_isolation_level(0) 


    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task(connection=self.pyConn) 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self, connection=None):   
     pyConn = connection 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 
+1

メイト。あなたに承認の刻印を与える名誉を持っていないが、そのコードは絶対に魔法でした。一定のDB接続を取り除くことで、速度をさらに50%増加させることが容易になりました。おそらく100%に近い場合もあります。再度、感謝します。 –

+0

@EnE_:それはあなたを助けてくれてうれしいよ:)。あなたは答えを受け入れるべきです、あなたは質問の所有者であるため、あなたはそれを行う権利があります。 –

+0

さて、ティックではなく上向きの矢印を押してくださいと思っていたことを認めなければなりません。 '承認の刻み目'は、残念ながらフレーズ= D –

関連する問題