2012-04-20 23 views
1

私は現在、レールプロセスと、(オブジェクトの作成のような)特定のイベントを通知する必要がある複数のワーカープロセスを備えたアーキテクチャを構築しています。Rubyメッセージバスの宝石はありますか?

プロセス(API、労働者、デーモンは、...)だけで、メッセージバスに加入し、ブロックが時にメッセージと呼ばれている間、私は、次の

class Article 
    after_creation do 
    MessageBus.send type: "article-created", id: self.id 
    end 
end 

を行うにはしたいと思い

Rails 
    |   API Worker 
    +----------o------o--------o------ - - - 
         Some other 
          daemon 

入っています。

MessageBus.subscribe do |msg| 
    if msg['type'] == 'article-created' 
    # if this is my websocket daemon, I would push the article to the browser 
    # if this is my indexing daemon, I would reindex the full-text search 
    # if this is ... you get the deal. 
    end 
end 

を現在、私はUNIXSocketでJSONを押してEventMachine.start_unix_domain_serverでそれを取得し、ローカルのUNIXドメインソケットを使用しています。しかし、これは双方向通信だけを可能にします。私はresqueの使用についても考えましたが、バスが必要な間にメッセージキューが増えています。それは赤目にかかっています。私は確かにルビーのいくつかのメッセージバスを実装している宝石がなければならないと確信していますが、グーグルでは結果につながっていません

+2

EventMachineチャンネルを試しましたか? EventMachineチャンネルの場合、http://eventmachine.rubyforge.org/EventMachine/Channel.html – tommasop

+0

+1。 (作業者がマスタと同じプロセス空間で実行している場合)もう一つ考慮すべきことは、RubyはObserverパターン[Observable](http://www.ruby-doc.org/stdlib-1.9 .3/libdoc/observer/rdoc/Observable.html) –

+2

参照:https://github.com/SamSaffron/message_bus –

答えて

1

最後に私はEventmachineチャンネルを使って簡単なソリューションをハッキングしました。

これは私のサーバーです。基本的にクライアントは/tmp/messagebus.sockに接続し、データを送信します。ソケットにプッシュされたものはすべて他のすべてのクライアントに送信されます。

require 'rubygems' 
require 'eventmachine' 

module Messagebus 
    class Server 
    attr_accessor :connections 
    attr_accessor :channel 

    def initialize 
     @connections = [] 
     @channel = EventMachine::Channel.new 
    end 

    def start 
     @signature = EventMachine.start_unix_domain_server '/tmp/messagebus.sock', Connection do |conn| 
     conn.server = self 
     end 
    end 

    def stop 
     EventMachine.stop_server(@signature) 

     unless wait_for_connections_and_stop 
     EventMachine.add_periodic_timer(1) { wait_for_connections_and_stop } 
     end 
    end 

    def wait_for_connections_and_stop 
     if @connections.empty? 
     EventMachine.stop 
     true 
     else 
     puts "Waiting for #{@connections.size} connection(s) to finish ..." 
     false 
     end 
    end 
    end 

    class Connection < EventMachine::Connection 
    attr_accessor :server 

    def post_init 
     log "Connection opened" 
    end 

    def server=(server) 
     @server = server 

     @subscription = server.channel.subscribe do |data| 
     self.log "Sending #{data}" 
     self.send_data data 
     end 
    end 

    def receive_data(data) 
     log "Received #{data}" 
     server.channel.push data 
    end 

    def unbind 
     server.channel.unsubscribe @subscription 
     server.connections.delete(self) 
     log "Connection closed" 
    end 

    def log(msg) 
     puts "[#{self.object_id}] #{msg}" 
    end 
    end 
end 

EventMachine::run { 
    s = Messagebus::Server.new 
    s.start 
    puts "New server listening" 
} 
関連する問題