2012-09-06 22 views
8

私は、クライアントから送信されたメッセージに反応するアプリケーションを持っています。 1つのメッセージはreload_credentialsです。新しいクライアントが登録されると、アプリケーションはそのメッセージを受信します。このメッセージは、PostgreSQLデータベースに接続し、すべての資格情報を照会し、通常のRubyハッシュ(client_id => client_token)に格納します。EventMachineを使用してこのユースケースをどのように処理する必要がありますか?

アプリケーションが受け取る可能性のあるその他のメッセージには、いくつかのセッション時間を記録するために使用されるstart,stoppauseなどがあります。私のポイントは、私は、アプリケーションが次のように機能して想像することです:

  • クライアントは
  • メッセージが
  • キューはIドン、例えば、しかし

を処理されているキューに入れられますメッセージを送信します原子炉をブロックしたい。さらに、次のキューにあるメッセージreload_credentialsがあるとしましょう。資格情報がDBから再ロードされるまで、キューからの他のメッセージは処理されません。また、(資格情報クエリが終了するのを待つような)特定のメッセージを処理している間に、他のメッセージをエンキューすることもできます。

このような問題を解決するために私を案内してください。私はem-synchronyを使わなければならないかもしれないと思っていますが、わかりません。

答えて

7

リアクタをブロックしないように、PostgreSQL EMドライバまたはEM.deferのいずれかを使用してください。

'reload_credentials'メッセージを受け取ると、後続のすべてのメッセージをエンキューするフラグを反転するだけです。 「reload_credentials」が完了したら、キューからすべてのメッセージを処理します。キューが空になった後、メッセージを受信したときに処理されるフラグを反転します。 PostgreSQLの

EMドライバはここに記載されています:https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server 
    def post_init 
    @queue    = [] 
    @loading_credentials = false 
    end 

    def recieve_message(type, data) 
    return @queue << [type, data] if @loading_credentials || [email protected]? 
    return process_msg(type, data) unless :reload_credentials == type 
    @loading_credentials = true 
    reload_credentials do 
     @loading_credentials = false 
     process_queue 
    end 
    end 

    def reload_credentials(&when_done) 
    EM.defer(proc { query_and_load_credentials }, when_done) 
    end 


    def process_queue 
    while (type, data = @queue.shift) 
     process_msg(type, data) 
    end 
    end 

    # lots of other methods 
end 

EM.start_server(HOST, PORT, Server) 

あなたはすべての接続は、任意の接続は、あなたがeigenclassを経由して調整する必要があります「reload_connections」というメッセージを受信するたびにメッセージをキューにしたい場合。

+0

しかし、reload_credentialsメッセージは複数回受信される可能性があります。 2つのスレッドのようにしてはいけませんか? 1つはキューイングを続け、もう1つは処理していますか? – Geo

+0

はい、他のreload_credentialsが処理されている間にreload_credentialsを受信すると、他のメッセージのようにキューに入れられます。 – simulacre

+0

複数のreload_credentialsメッシュは、最初のメッシュと同様に処理する必要があります。 reload_credentialsをEM.deferブロックに入れることで、別のスレッドで実行しています。あなたの '処理'コードが非ブロックである限り、あなたはメッセージを受け取り続けます。 EM互換ライブラリを使用して、ブロックしないようにしてください。あるいは、処理のためにEM.deferを使用してください。 – simulacre

4

次は、私は推測である、あなたの現在の実装のようなもの:

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 


    q = EM::Queue.new 

    workers = Array.new(10) { Worker.new q } 

上記の問題、私が正しくあなたを理解していれば、あなたは労働者がその新しいジョブ(仕事に取り組んでいたくないということですreload_credentialsジョブよりも早く、プロデューサのタイムラインに到着しました)。以下の人がこれを補うべきです(最後に注意してください)。

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 

    class LockingDispatcher 
     def initialize channel, queue 
     @channel = channel 
     @queue = queue 

     @backlog = [] 
     @channel.subscribe method(:dispatch_with_locking) 

     @locked = false 
     end 

     def dispatch_with_locking item 
     if locked? 
      @backlog << item 
     else 
      # You probably want to move the specialization here out into a method or 
      # block that's passed into the constructor, to make the lockingdispatcher 
      # more of a generic processor 
      case item.type 
      when :reload_credentials 
      lock 
      deferrable = CredentialReloader.new(item).start 
      deferrable.callback { unlock } 
      deferrable.errback { unlock } 
      else 
      dispatch_without_locking item 
      end 
     end 
     end 

     def dispatch_without_locking item 
     @queue << item 
     end 

     def locked? 
     @locked 
     end 

     def lock 
     @locked = true 
     end 

     def unlock 
     @locked = false 
     bl = @backlog.dup 
     @backlog.clear 
     bl.each { |item| dispatch_with_locking item } 
     end 

    end 

    channel = EM::Channel.new 
    queue = EM::Queue.new 

    dispatcher = LockingDispatcher.new channel, queue 

    workers = Array.new(10) { Worker.new queue } 

だから、最初のシステムへの入力はqに入ってきますが、この新システムでは、channelに入ってきます。 queueは依然として作業者間の作業分散に使用されますが、資格情報のリフレッシュ操作が実行されている間はqueueは入力されません。残念ながら、私は時間がかかりませんでしたので、私はLockingDispatcherをアイテムタイプおよびコードCredentialsReloaderと結合しないように一般化していません。私はあなたにそれを残すでしょう。

これは、私があなたの元の要求を理解している間、この種の要件を緩和する方が一般的には良いことに注意してください。基本的にその要件の変化なしに根絶することはできませんいくつかの未解決の問題があります。

  • 始まる資格情報ジョブ
  • 前に完了したジョブを実行するために待機していないシステムを非常にひどく資格情報ジョブのバーストを処理するシステム - 処理可能な可能性のある他のアイテムは表示されません。
  • 資格コードにバグがあると、バックログがRAMをいっぱいにして失敗する可能性があります。単純なタイムアウトは、コードが中断可能で、その後のメッセージがさらにデッドロックを回避するために十分に処理可能であれば、壊滅的な影響を避けるのに十分であるかもしれません。

実際には、システムにユーザーIDの概念があるようです。要件を考えると、資格情報がリフレッシュ状態のユーザーIDに関連するアイテムのみをバックログする必要がある可能性があります。これは、異なる種類のディスパッチを含む別の問題です。それらのユーザのロックされたバックログのハッシュを試してください。資格情報の補完をコールバックして、それらのバックログをワーカーに排除するか、または同様の仕組みにしてください。

幸運を祈る!

関連する問題