EventMachine
とamqp
とrails
を使用する方法がわかりません。イベントマシンとRailsを使用する複数の作業者
私が見つけたチュートリアルのほとんどは、キューに追加できる「プロデューサ」を設定するためのものでした。私は逆を実行する必要があります、私はキューにアイテムを提出するPythonアプリケーションがあり、私は、キューからアイテムをピックアップし、それらを処理するために、レールアプリケーションが必要です。
複数のキューが処理される必要があるため、キューごとに複数のワーカーを停止させたい場合や、可能であればワーカーの数を調整する必要があります。
例:hereに続いて何かが実行されています。コードは古くなっていますが、何かを動かすことができました。私はそれを下に貼りました。 1つの大きな障害はJrubyを使用しているので、EventMachine.fork
を使用できません。これに代わるものはありますか?
m_q_helper.rb:
require 'rubygems'
require 'amqp'
require 'benchmark'
require 'json'
module MQHelper
def log message
puts "#{MQ.id}: #{message}"
$stdout.flush
end
def logp *args
print args
$stdout.flush
end
def graceful_death
AMQP.stop{ EM.stop }
exit(0)
end
protected
def load_rails_environment
mark = Benchmark.realtime do
require 'config/environment'
end
log "loaded rails environment... #{mark} seconds"
end
def infrastructure(&block)
mark = Benchmark.realtime do
block.call
end
log "loading required infrastructure... #{mark} seconds"
end
def serialize data
JSON.dump(data)
end
def unserialize data
JSON.parse(data)
end
end
processor.rb: #コア は、 '日付'
# Gems
require 'rubygems'
require 'amqp'
# Helper
require 'lib/m_q_helper.rb'
# For ack to work appropriately you must shutdown AMQP gracefully,
# otherwise all items in your queue will be returned
Signal.trap('INT') {
unless EM.forks.empty?
EM.forks.each do |pid|
Process.kill('KILL', pid)
end
end
AMQP.stop{ EM.stop }
exit(0)
}
Signal.trap('TERM') {
unless EM.forks.empty?
EM.forks.each do |pid|
Process.kill('KILL', pid)
end
end
AMQP.stop{ EM.stop }
exit(0)
}
# spawn workers
workers = ARGV[1] ? (Integer(ARGV[1]) rescue 1) : 1
# puts "workers: #{ workers }"
EventMachine.run do
AMQP.start do
class Processer
attr_reader :options, :connection, :channel
include MQHelper
def initialize env
@options = env
load_rails_environment
@connection = AMQP.connect(
:host => "localhost",
:vhost => "/",
:username => "guest",
:password => "guest",
:port => 5672
)
@channel = AMQP::Channel.new(connection)
puts "Connected to AMQP Broker."
end
# entry point - `process.rb "users"`
def process
section = ARGV[0]
if section.nil?
log "You must supply a section to process (users, clients, etc..)"
graceful_death
else
case section
when "stuff"
process_item
when "other_stuff"
process_item_differently
else
raise NotImplementedError
end
end
end # end process
protected
# update legacy users to new users model
def process_item_differently
exchange = @channel.topic("topic")
queue = @channel.queue("othertopic.name").bind(exchange, :routing_key => "othertopic.name")
run_process(queue) do |data|
do_other_stuff
end
end
def process_item
exchange = @channel.topic("topic")
queue = @channel.queue("topic.name").bind(exchange, :routing_key => "topic.name")
run_process(queue) do |data|
do_stuff
end
end # end process user
def run_process(queue, &block)
queue.subscribe(:ack => true) { |headers, payload|
data = unserialize(payload)
block.call(data)
headers.ack
}
end # end run process
end # end Processer
Processer.new(ENV).process
end # end AMQP.start
end # end EM.fork
# # wait on forks
# while !EM.forks.empty?
# sleep(5)
# end
私はAMQPをあまり使っていませんでしたが、あなたのキューに登録されているXスレッドを生成するプロセスを生成する代わりに、スレッドルートを試みることができると思います。 – Schmurfy
そして、私はevent_machineでどうすればいいのですか? – zsquare