2012-01-12 9 views
0

EventMachineamqprailsを使用する方法がわかりません。イベントマシンとRailsを使用する複数の作業者

私が見つけたチュートリアルのほとんどは、キューに追加できる「プロデューサ」を設定するためのものでした。私は逆を実行する必要があります、私はキューにアイテムを提出するPythonアプリケーションがあり、私は、キューからアイテムをピックアップし、それらを処理するために、レールアプリケーションが必要です。

複数のキューが処理される必要があるため、キューごとに複数のワーカーを停止させたい場合や、可能であればワーカーの数を調整する必要があります。

例:hereに続いて何かが実行されています。コードは古くなっていますが、何かを動かすことができました。私はそれを下に貼りました。 1つの大きな障害はJrubyを使用しているので、EventMachine.forkを使用できません。これに代わるものはありますか?

m_q_helper.rb:

require 'rubygems' 
require 'amqp' 
require 'benchmark' 
require 'json' 

module MQHelper 

    def log message 
    puts "#{MQ.id}: #{message}" 
    $stdout.flush 
    end 

    def logp *args 
    print args 
    $stdout.flush 
    end 

    def graceful_death 
    AMQP.stop{ EM.stop } 
    exit(0) 
    end 

    protected 

    def load_rails_environment 
    mark = Benchmark.realtime do 
     require 'config/environment' 
    end 
    log "loaded rails environment... #{mark} seconds" 
    end 

    def infrastructure(&block) 
    mark = Benchmark.realtime do 
     block.call 
    end 
    log "loading required infrastructure... #{mark} seconds" 
    end 

    def serialize data 
    JSON.dump(data) 
    end 

    def unserialize data 
    JSON.parse(data) 
    end 
end 

processor.rb: #コア は、 '日付'

# Gems 
require 'rubygems' 
require 'amqp' 

# Helper 
require 'lib/m_q_helper.rb' 

# For ack to work appropriately you must shutdown AMQP gracefully, 
# otherwise all items in your queue will be returned 
Signal.trap('INT') { 
    unless EM.forks.empty? 
    EM.forks.each do |pid| 
     Process.kill('KILL', pid) 
    end 
    end 
    AMQP.stop{ EM.stop } 
    exit(0) 
} 
Signal.trap('TERM') { 
    unless EM.forks.empty? 
    EM.forks.each do |pid| 
     Process.kill('KILL', pid) 
    end 
    end 
    AMQP.stop{ EM.stop } 
    exit(0) 
} 

# spawn workers 
workers = ARGV[1] ? (Integer(ARGV[1]) rescue 1) : 1 
# puts "workers: #{ workers }" 
EventMachine.run do 
    AMQP.start do 

    class Processer 

     attr_reader :options, :connection, :channel 

     include MQHelper 

     def initialize env 
     @options = env 

     load_rails_environment 

     @connection = AMQP.connect( 
            :host => "localhost", 
            :vhost => "/", 
            :username => "guest", 
            :password => "guest", 
            :port => 5672 
           ) 
     @channel = AMQP::Channel.new(connection) 

     puts "Connected to AMQP Broker." 

     end 

     # entry point - `process.rb "users"` 
     def process 
     section = ARGV[0] 

     if section.nil? 
      log "You must supply a section to process (users, clients, etc..)" 
      graceful_death 
     else 
      case section 
      when "stuff" 
      process_item 
      when "other_stuff" 
      process_item_differently 
      else 
      raise NotImplementedError 
      end 
     end 
     end # end process 

     protected 

     # update legacy users to new users model 
     def process_item_differently 
     exchange = @channel.topic("topic") 
     queue = @channel.queue("othertopic.name").bind(exchange, :routing_key => "othertopic.name") 

     run_process(queue) do |data| 
      do_other_stuff 
     end 
     end 

     def process_item 
     exchange = @channel.topic("topic") 
     queue = @channel.queue("topic.name").bind(exchange, :routing_key => "topic.name") 

     run_process(queue) do |data| 
      do_stuff 
     end 
     end # end process user 

     def run_process(queue, &block) 
     queue.subscribe(:ack => true) { |headers, payload| 
      data = unserialize(payload) 
      block.call(data) 
      headers.ack 
     } 
     end # end run process 

    end # end Processer 

    Processer.new(ENV).process 
    end # end AMQP.start 
end # end EM.fork 

# # wait on forks 
# while !EM.forks.empty? 
# sleep(5) 
# end 
+0

私はAMQPをあまり使っていませんでしたが、あなたのキューに登録されているXスレッドを生成するプロセスを生成する代わりに、スレッドルートを試みることができると思います。 – Schmurfy

+0

そして、私はevent_machineでどうすればいいのですか? – zsquare

答えて

1

を必要とする1つのキューに聞くと、スレッドのプールにメッセージ処理を延期:

require 'amqp' 

EM::threadpool_size = 20 

EM::run do 
    connection = AMQP.connect(:host => '127.0.0.1') 
    puts "Connecting to AMQP broker. Running #{AMQP::VERSION} version of the gem..." 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true) 
    exchange = channel.default_exchange 

    queue.subscribe do |payload| 
    # use the eventmachine thread pool so we don't block 
    # the reactor 
    EM::defer do 
     # handle the payload received here 
    end 
    end 

end 

私はライブラリーのこんにちはの世界から始めました私はそれが動作すると思うが、私はそれをテストしていない、あなたはコードが異なる場合でもアイデアを得る必要があります。 メッセージが原子炉のスレッドで処理される代わりに到着すると、メッセージはイベントマシンのスレッドプールキューにキューイングされ、スレッドが使用可能になると直ちにジョブが使用されます(またはすべてが使用可能である場合はすぐに)。

ファイバーを使用することもできますが、それらに精通していない場合はスレッドを貼る必要があります。

+0

答えをありがとう。しかし、この場合、タスクは引き続きフォアグラウンドで実行されます。デーモンとして動作させるにはどうしたらいいですか? – zsquare

+0

「フォアグラウンド」とは何を意味しますか?アプリケーション全体について話していますか?私の最善の提案は、daemontoolsやsupervisordのようなプロセスを実行して監視するための外部ツールを使用することです。 – Schmurfy

+0

はい、それは私が意味するものです。スーパーバイザーを使っているようです。ありがとう! – zsquare

関連する問題