私はサーバーの開発に取り組んでおり、テスト用のクライアントスクリプトを開発しました。このクライアントスクリプトは、複数の/多くのコマンドプロセッサを起動し、それぞれを "ruby client.rb"で起動するとうまく動作します。このプロセスをより効率的にするために、スレッドを使用したいクライアントを起動できる親スクリプト "threads.rb"を作成しようとしています。それらはすべて始まりますが、最後のスレッドのみがthreads.rbを使用すると効果的に実行されます。スレッド/ロードのみを使用して複数のRubyスクリプトを実行すると、1つの実行中のコピーが生成されます。
client.rbはRedisにアクセスし、衝突を防ぐためにそこにミューテックスを使用しています。私はクライアントの他の部分にロックをかけておらず、その中の問題を防ぐために何もしていません。 client.rbが
threads.rb複数のコマンドプロセッサで実行することができるので、私は、threads.rbを使用できるようにしたいと思います:
threads = 4
the_threads = []
(1..threads).each do |thread|
puts "Creating thread #{thread}"
the_threads << Thread.new(thread) do |t|
puts "Running Client #{thread};\n"
load "./samples/client.rb"
end
end
client.rb
%w(rubygems bundler faye/websocket eventmachine json redis redis-classy redis-mutex).each { |m| require m }
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
class ClientWs
def self.em_run
EM.run do
puts "EM is running."
uri = 'localhost'
redis_uri = URI.parse("redis://127.0.0.1:6379")
redis ||= Redis.new(host: redis_uri.host, port: redis_uri.port, password: redis_uri.password)
RedisClassy.redis = Redis.new(host: redis_uri.host, port: redis_uri.port, password: redis_uri.password)
mutex = RedisMutex.new(:device, block: 0.5, sleep: 0.5)
if mutex.lock
device = 0
channel = 100
type = 'Reader'
begin
channel += 1
type = type == 'Display' ? 'Reader' : 'Display'
device += 1 if type == 'Display'
lockbox = "Lbox_#{device}"
unit = "#{lockbox}.#{type}.Status"
status = redis.get(unit)
end until status.nil? || !status
channel = type == 'Reader' ? "RDR:#{channel}" : "DSP:#{channel}"
result = redis.set(unit, 'Offline')
puts "Connecting with:#{unit}; Using channel:#{channel};"
url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}"
puts url
@ws = Faye::WebSocket::Client.new(url)
start = Time.now
count ||= 0
mutex.unlock
else
puts 'RedisMutex failed to obtain mutex lock.'
exit
end
timer = EventMachine.add_periodic_timer(5+rand(5)) {
count += 1
count.even? ? send({"ECHO": {"CMD": "PING"}}) : send({"CMD": "PING"})
Thread.pass
}
@ws.on :open do |event|
ClientWs.send({"OPEN": lockbox})
end
@ws.on :message do |event|
@ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address
begin
parsed = JSON.parse event.data
rescue => e
puts ">>>> [Error! Failed to parse JSON]"
puts ">>>> [#{e.message}]"
puts ">>>> #{event.data}"
end
puts ">> #{@ip_address}:#{channel}:#{event.data};"
#@ws.close
end
@ws.on :close do |event|
timer.cancel
stop = Time.now - start
result = redis.del(unit)
puts "#{stop} seconds;"
p [:close, event.code, event.reason]
ws = nil
ClientWs.em_run
end
end
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
ClientWs.em_run