次は、私は推測である、あなたの現在の実装のようなもの:
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
q = EM::Queue.new
workers = Array.new(10) { Worker.new q }
上記の問題、私が正しくあなたを理解していれば、あなたは労働者がその新しいジョブ(仕事に取り組んでいたくないということですreload_credentialsジョブよりも早く、プロデューサのタイムラインに到着しました)。以下の人がこれを補うべきです(最後に注意してください)。
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
class LockingDispatcher
def initialize channel, queue
@channel = channel
@queue = queue
@backlog = []
@channel.subscribe method(:dispatch_with_locking)
@locked = false
end
def dispatch_with_locking item
if locked?
@backlog << item
else
# You probably want to move the specialization here out into a method or
# block that's passed into the constructor, to make the lockingdispatcher
# more of a generic processor
case item.type
when :reload_credentials
lock
deferrable = CredentialReloader.new(item).start
deferrable.callback { unlock }
deferrable.errback { unlock }
else
dispatch_without_locking item
end
end
end
def dispatch_without_locking item
@queue << item
end
def locked?
@locked
end
def lock
@locked = true
end
def unlock
@locked = false
bl = @backlog.dup
@backlog.clear
bl.each { |item| dispatch_with_locking item }
end
end
channel = EM::Channel.new
queue = EM::Queue.new
dispatcher = LockingDispatcher.new channel, queue
workers = Array.new(10) { Worker.new queue }
だから、最初のシステムへの入力はq
に入ってきますが、この新システムでは、channel
に入ってきます。 queue
は依然として作業者間の作業分散に使用されますが、資格情報のリフレッシュ操作が実行されている間はqueue
は入力されません。残念ながら、私は時間がかかりませんでしたので、私はLockingDispatcher
をアイテムタイプおよびコードCredentialsReloader
と結合しないように一般化していません。私はあなたにそれを残すでしょう。
これは、私があなたの元の要求を理解している間、この種の要件を緩和する方が一般的には良いことに注意してください。基本的にその要件の変化なしに根絶することはできませんいくつかの未解決の問題があります。
- 始まる資格情報ジョブ
- 前に完了したジョブを実行するために待機していないシステムを非常にひどく資格情報ジョブのバーストを処理するシステム - 処理可能な可能性のある他のアイテムは表示されません。
- 資格コードにバグがあると、バックログがRAMをいっぱいにして失敗する可能性があります。単純なタイムアウトは、コードが中断可能で、その後のメッセージがさらにデッドロックを回避するために十分に処理可能であれば、壊滅的な影響を避けるのに十分であるかもしれません。
実際には、システムにユーザーIDの概念があるようです。要件を考えると、資格情報がリフレッシュ状態のユーザーIDに関連するアイテムのみをバックログする必要がある可能性があります。これは、異なる種類のディスパッチを含む別の問題です。それらのユーザのロックされたバックログのハッシュを試してください。資格情報の補完をコールバックして、それらのバックログをワーカーに排除するか、または同様の仕組みにしてください。
幸運を祈る!
しかし、reload_credentialsメッセージは複数回受信される可能性があります。 2つのスレッドのようにしてはいけませんか? 1つはキューイングを続け、もう1つは処理していますか? – Geo
はい、他のreload_credentialsが処理されている間にreload_credentialsを受信すると、他のメッセージのようにキューに入れられます。 – simulacre
複数のreload_credentialsメッシュは、最初のメッシュと同様に処理する必要があります。 reload_credentialsをEM.deferブロックに入れることで、別のスレッドで実行しています。あなたの '処理'コードが非ブロックである限り、あなたはメッセージを受け取り続けます。 EM互換ライブラリを使用して、ブロックしないようにしてください。あるいは、処理のためにEM.deferを使用してください。 – simulacre