2012-08-20 1 views
7

LIFOに近い、あるいはFIFOに近い(例えば、ランダムな)動作でもない人は、multiprocessing.Queueから清潔な方法を知っていますか?マルチプロセッシングからLIFOに近い動作を得るきれいな方法.Queue? (または、FIFOの近くではないだけでも)

代替の質問:実際のストレージ構造を管理するスレッドのコードに誰かを指摘できますか?multiprocessing.Queue LIFOアクセスを提供するのはそんなに簡単ではないようですが、私はウサギの穴でそれを見つけようとしています。

注:

  1. 私はmultiprocessing.Queuedoes not guarantee orderを信じています。ファイン。しかし、FIFOに近いので、LIFOに近いでしょう。
  2. 私は現在のアイテムをすべてキューから取り除いて作業をする前にその順序を逆にすることができますが、可能であればkludgeを避けることをお勧めします。

明確にする(編集):私はmultiprocessingとCPUバウンドシミュレーションをやっているので、Queueから専門的なキューを使用することはできません。私は数日間何の答えも見なかったので、上記の代替質問を追加しました。場合


それは、以下の問題であるmultiprocessing.Queueが近く-FIFOであることを、わずかな証拠があります。

import multiprocessing as mp 
import Queue 

q = mp.Queue() 

for i in xrange(1000): 
    q.put(i) 

deltas = [] 
while True: 
    try: 
     value1 = q.get(timeout=0.1) 
     value2 = q.get(timeout=0.1) 
     deltas.append(value2-value1) 
    except Queue.Empty: 
     break 

#positive deltas would indicate the numbers are coming out in increasing order 
min_delta, max_delta = min(deltas), max(deltas) 
avg_delta = sum(deltas)/len(deltas) 

print "min", min_delta 
print "max", max_delta 
print "avg", avg_delta 

プリント:それは単純なケース(シングルスレッド)で、それは私のシステムでFIFO完璧であることを示している分、最大、および平均は正確に1が(完璧FIFO)

+1

賢いテスト... – mgilson

+0

すべての追加が完了した後でLIFOデータが必要なのですか、または新しい値がまだ追加されている間に最新のデータを取得したいのですか?前者の場合は、キューの内容を逆にするのが最も簡単です。あなたがLIFOアクセスを "生きて"いるなら、多分 'マルチプロセッシング'モジュールからの共有メモリプリミティブを使って独自のデータ構造を書く必要があります。 – Blckknght

+0

@Blckknghtすべてがそこに来るまで待つことができれば、それはかなり簡単です(オプション2)が、キューがスタックとして動作するようにするための継続的なシミュレーションです。私はプリミティブを素早く見て、キュー処理スレッドをカスタマイズすることを望んでいましたが、その頭や尾を作ることができませんでした。私が簡単な方法を見つけることができない場合、それを調査することは私の次のステップです。コメントありがとう! – KobeJohn

答えて

2

私はPythonのインストール(Pythonの2.7でLib/multiprocessing/queues.pyに住んでいるキュークラス上で見てきましたPython 3.2のバージョンとは明らかに違いはありません)。

キューオブジェクトによって管理されるオブジェクトは2セットあります。 1つのセットは、すべてのプロセスによって共有されるマルチプロセスセーフのプリミティブです。他は、各プロセスによって別々に作成され、使用されます。

オブジェクトが__init__方法で設定されたクロスプロセス:

  1. Pipeオブジェクト、両端がself._readerself._writerとして保存されます。
  2. BoundedSemaphoreオブジェクトで、キュー内のオブジェクトの数を数えます(オプションで制限します)。
  3. Lockパイプを読み取るオブジェクトで、Windows以外のプラットフォームでは別のオブジェクトを書き込むオブジェクトです。(私は、パイプへの書き込みは、Windows上で本質的にマルチ・安全であるためであると仮定します。)

プロセスごとのオブジェクトが_after_fork_start_thread方法で設定されています

  1. collections.dequeオブジェクトをパイプへの書き込みをバッファリングするために使用されます。
  2. threading.conditionバッファが空でないことを通知するオブジェクトです。
  3. threading.Thread実際の書き込みを行うオブジェクトです。それは遅れて作成されるので、キューへの少なくとも1つの書き込みが所定のプロセスで要求されるまでは存在しません。
  4. さまざまなFinalizeオブジェクトは、プロセスが終了するときれいになる。

キューからのgetはかなり簡単です。読み取りロックを取得し、セマフォを減らし、オブジェクトをパイプの読み取り側から取得します。

putはもっと複雑です。複数のスレッドを使用します。 putへの呼び出し側は、条件のロックを取得し、そのオブジェクトをバッファに追加し、ロックを解除する前に条件を通知します。セマフォをインクリメントし、ライタスレッドがまだ実行されていない場合は起動します。

メソッドで書き込みスレッドが永遠に(キャンセルされるまで)ループします。バッファが空の場合は、notemptyの状態で待機します。次に、バッファからアイテムを取り出し、書き込みロックを取得し(存在する場合)、アイテムをパイプに書き込みます。


だから、すべてのことを考えれば、それを変更してLIFOキューを取得できますか?それは容易ではないようです。パイプは本質的にFIFOオブジェクトであり、待ち行列は(複数のプロセスからの書き込みの非同期性のために)FIFOの動作を全体的に保証することはできませんが、常にほとんどFIFOになるでしょう。

コンシューマが1つだけの場合は、キューからオブジェクトgetを取得し、独自のプロセスローカルスタックに追加できます。マルチ消費者スタックを行うことは難しくなりますが、共有メモリでは、バウンド・サイズのスタックはそれほど難しくありません。ロック、条件のペア(完全な状態と空の状態のブロッキング/シグナリングのため)、共有された整数値(保持されている値の数)、適切な型の共有配列(値そのもの)が必要です。

+0

私はこれに取り組む時間を作ることができるとき、私はmp.Queueを通してあなたの素晴らしいツアーに続き、私が見ることができるものを見ます。私はデータが "popleft"に簡単ないくつかのオブジェクトで保持されていることを期待していたが、あなたが言ったようにパイプだと思う。いずれにせよ、ここでのあなたの説明は、mp.Queueがどのように動作するかについてインターネット上で最高です!どうもありがとうございました。 – KobeJohn

1

がありますQueueパッケージ(Python 3のキュー)にはLIFO queueが含まれています。これは、マルチプロセッシングまたはマルチプロセッシングのキューモジュールでは公開されません。

q = mp.Queue()q = Queue.LifoQueue()に置き換えて、最小、最大、平均を正確に-1として印刷します。

(また、私は唯一つのスレッドからアイテムを取得するときは、必ず正確にFIFO/LIFO順を取得するべきだと思います。)

+0

提案していただきありがとうございます。残念ながら、実際にマルチプロセッシングを使用するCPUバウンドシミュレーションを行っています。私の知る限りでは、標準キューはプロセスを処理できません。私はそれについてのいくつかの詳細と私の質問を更新します。 – KobeJohn

関連する問題