2013-02-07 6 views
8

こんにちは、sidekiqを使用して2つの別々のredisインスタンスで作業しますか?

私は2つの別々の、しかし関連するアプリがあります。彼らは両方とも独自のバックグラウンドキューを持つ必要があります(readys:separate Sidekiq & Redisプロセス)。しかし、時折、のキューにジョブをプッシュできるようにしたいと考えています。app1からです。 app1は、既存のSidekiq/Redisのスタック持っていなかった場合は、単純なキュー/プッシュ観点から

、これを行うのは簡単だろう。しかし、与えられた

# In a process, far far away 

# Configure client 
Sidekiq.configure_client do |config| 
    config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' } 
end 

# Push jobs without class definition 
Sidekiq::Client.push('class' => 'Example::Workers::Trace', 'args' => ['hello!']) 

# Push jobs overriding default's 
Sidekiq::Client.push('queue' => 'example', 'retry' => 3, 'class' =>  'Example::Workers::Trace', 'args' => ['hello!']) 

私はすでに呼び出されているであろうことをSidekiq.configure_clientSidekiq.configure_serverapp1から、ここに何かが起こる必要のあるステップがあります。

明らかに、Sidekiq内部からシリアル化と正規化コードを直接取り込み、手動でapp2のredisキューにプッシュすることはできますが、それは脆弱な解決策のようです。私はClient.push機能を使用できるようにしたいと思います。

私は私の理想的なソリューションのようないろいろ書いだろうとします

SidekiqTWO.configure_client { remote connection..... } SidekiqTWO::Client.push(job....)

あるいは:

明らかに少し冗談

$redis_remote = remote_connection.....

Sidekiq::Client.push(job, $redis_remote)

が、それは私です理想的なユースケース。

ありがとうございます!

+0

私はこれに200ポイントの賞金を提供しています - これは、サイドキッククライアントが高可用性と障害のために2つの異なるredisインスタンスにメッセージを「ラウンドロビン」することを可能にする点で、 -overover。 –

答えて

8

したがって、According to the FAQは、「Sidekiqメッセージフォーマットは非常に単純では安定しています:これは単なるJSON形式のハッシュです。強調しているのは、JSONをsidekiqに送るのはあまりにも脆弱だとは思わない。特に、OPの状況のように、Redisのインスタンスを細かく制御する必要がある場合は、ジョブをエンキューする際にRedisインスタンスを指定できる小さなラッパーを作成するだけです。

Redisのインスタンスへのラウンドロビンの仕事へのケビンBedellのより一般的な状況については、私はあなたがRedisのインスタンスがあなただけエンキューと持ちたいused--となっているコントロールを持っている必要はありません想像は、配布は自動的に管理されます。それはonly one person has requested this so farのように見える、とthey came up with a solutionRedis::Distributedを使用する:

datastore_config = YAML.load(ERB.new(File.read(File.join(Rails.root, "config", "redis.yml"))).result) 

datastore_config = datastore_config["defaults"].merge(datastore_config[::Rails.env]) 

if datastore_config[:host].is_a?(Array) 
    if datastore_config[:host].length == 1 
    datastore_config[:host] = datastore_config[:host].first 
    else 
    datastore_config = datastore_config[:host].map do |host| 
     host_has_port = host =~ /:\d+\z/ 

     if host_has_port 
     "redis://#{host}/#{datastore_config[:db] || 0}" 
     else 
     "redis://#{host}:#{datastore_config[:port] || 6379}/#{datastore_config[:db] || 0}" 
     end 
    end 
    end 
end 

Sidekiq.configure_server do |config| 
    config.redis = ::ConnectionPool.new(:size => Sidekiq.options[:concurrency] + 2, :timeout => 2) do 
    redis = if datastore_config.is_a? Array 
     Redis::Distributed.new(datastore_config) 
    else 
     Redis.new(datastore_config) 
    end 

    Redis::Namespace.new('resque', :redis => redis) 
    end 
end 

高可用性を取得し、フェイルオーバーをするためにあなたの探求に考慮すべきもう一つは、信頼性機能を備えてSidekiq Proを得ることです:「Sidekiq Proのクライアントが耐えることができます一時的なRedisの停止が発生した場合、エラーが発生した場合にジョブをローカルにエンキューし、接続が復旧したらそれらのジョブを配信しようとします。とにかくsidekiqはバックグラウンドプロセスのため、Redisインスタンスがダウンした場合には短い遅延がアプリケーションに影響してはなりません。 2つのRedisインスタンスのうち1つがダウンしてラウンドロビンを使用している場合、この機能を使用していない限り、まだ一部のジョブが失われています。

+0

このよく調査された答えをありがとう! –

1

carols10centsは非常にシンプルですが、私はいつもその機能をカプセル化して他のプロジェクトで再利用できるようにしているので、blog from Hotel Tonightからアイディアを更新しました。この次の解決策は、Rails 4.1 & Springプリローダーで生き残れないHotel Tonight'sを改善します。

現在、私はlib/remote_sidekiq/に次のファイルを追加することで間に合わ:

remote_sidekiq.rb

class RemoteSidekiq 
    class_attribute :redis_pool 
end 

あなたは初期化子を作成する必要がremote_sidekiq_worker.rb

require 'sidekiq' 
require 'sidekiq/client' 

module RemoteSidekiqWorker 
    def client 
    pool = RemoteSidekiq.redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool 
    Sidekiq::Client.new(pool) 
    end 

    def push(worker_name, attrs = [], queue_name = "default") 
    client.push('args' => attrs, 'class' => worker_name, 'queue' => queue_name) 
    end 
end 

それはredis_poolを設定します

設定/初期化子/ remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL") 
namespace = 'primary' 

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url)) 

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis } 

あなたはすることができます、あなたが好きな場所単にinclude RemoteSidekiqWorkerモジュール。仕事が終わった! RemoteWorkerモデルに追加MORE大規模な環境 ****

FOR

**** は、余分な利点を追加します。

  1. あなたはどこでもへのアクセス権を持つシステムを含むRemoteWorkersを再利用することができますターゲット側の作業員。これは発信者にとって透過的です。ターゲットSidekiqシステム内で "RemoteWorkers"フォームを使用するには、ローカルSidekiqクライアントをデフォルトとして使用するため、イニシャライザを使用しないでください。
  2. RemoteWorkersを使用すると、正しい引数が常に送信されます(コード=ドキュメント)
  3. より複雑なSidekiqアーキテクチャを作成することで拡張することは、呼び出し側にとって透過的です。ここで

は、私がこれに出くわしたと私は、メッセージがキューから読み出される方法を複雑にする、ActiveJobを使用していますので、いくつかの問題に遭遇したRemoteWorker

class RemoteTraceWorker 
    include RemoteSidekiqWorker 
    include ActiveModel::Model 

    attr_accessor :message 

    validates :message, presence: true 

    def perform_async 
    if valid? 
     push(worker_name, worker_args) 
    else 
     raise ActiveModel::StrictValidationFailed, errors.full_messages 
    end 
    end 

    private 

    def worker_name 
    :TraceWorker.to_s 
    end 

    def worker_args 
    [message] 
    end 
end 
0

例です。

ビルAROの答えに、あなたはまだredis_poolのセットアップが必要になります。

remote_sidekiq.rb

class RemoteSidekiq 
    class_attribute :redis_pool 
end 

設定/初期化子/ remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL") 
namespace = 'primary' 

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url)) 

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis } 

今をワーカーの代わりに、ActiveJobアダプタを作成してリクエストをキューに入れますt:

lib/active_job/queue_adapters/remote_sidekiq_adapter。RB

require 'sidekiq' 

module ActiveJob 
    module QueueAdapters 
    class RemoteSidekiqAdapter 
     def enqueue(job) 
     #Sidekiq::Client does not support symbols as keys 
     job.provider_job_id = client.push \ 
      "class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper, 
      "wrapped" => job.class.to_s, 
      "queue" => job.queue_name, 
      "args" => [ job.serialize ] 
     end 

     def enqueue_at(job, timestamp) 
     job.provider_job_id = client.push \ 
      "class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper, 
      "wrapped" => job.class.to_s, 
      "queue" => job.queue_name, 
      "args" => [ job.serialize ], 
      "at"  => timestamp 
     end 

     def client 
     @client ||= ::Sidekiq::Client.new(RemoteSidekiq.redis_pool) 
     end 
    end 
    end 
end 

私は今のイベントをキューにアダプタを使用することができます。

require 'active_job/queue_adapters/remote_sidekiq_adapter' 

class RemoteJob < ActiveJob::Base 
    self.queue_adapter = :remote_sidekiq 

    queue_as :default 

    def perform(_event_name, _data) 
    fail " 
     This job should not run here; intended to hook into 
     ActiveJob and run in another system 
    " 
    end 
end 

私は今、通常のActiveJob APIを使用してジョブをキューに入れることができます。いずれのアプリがこれをキューから読み込んでも、アクションを実行するには一致するRemoteJobが必要です。

関連する問題