2012-10-04 31 views
5

Clojureのコンシューマ/パブリッシャは、メッセージを受信して​​処理し、RabbitMQ経由で他のコンシューマに送信します。Clojureメッセージ処理/非同期、マルチスレッド

別のスレッド(メインスレッドとは別のスレッド)でメッセージを処理するメッセージハンドラを定義しました。 以下のコードからわかるように、スレッドはメッセージを同期して送受信し、すべてlcm/subscribe関数によって開始されたイベントループで発生します。

したがって、これらの同期メッセージハンドラのNサイズのスレッドプールを作成する "Clojure way"はどうでしょうか?私は非Clojureの方法は、Java interop経由でいくつかのスレッドを手動で起動することだと思います。

また、処理が非常にCPU集約的ではないことを考慮すると、メッセージ処理の処理速度が向上しますか?これらのメッセージハンドラを非同期にする方が良いでしょうか。処理よりも発行に時間がかかることを考慮してください。

最後に、これらの競合するアプローチ(私はRuby/Javascriptの世界から来ており、そこにマルチスレッド化はありません)のパフォーマンスをどのように測定するのでしょうか?

: 私はこのすべてがちょうど水平スケーリング、メッセージバスに耳を傾け、よりJVMのプロセスを生成することで回避できることを知っているが、アプリはHerokuの上に展開されようとしていることから、私はとして使用したいのですができるだけ多くのリソースを各dyno /プロセスで使用できます。

より基本的なノートで
(defn message-handler 
    [ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message))) 

(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
         (lcm/subscribe ch q-name message-handler :auto-ack true)))))) 

...私はこのように、追加の引数を指定してメッセージ・ハンドラのコールバックを登録サポートするために、このコードをリファクタリングについては行くだろうか:

(.start (Thread. (fn [] 
         (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true)))))) 

をし、その後、参照して公開:

(lb/publish ch pub-name "" processed-message))) 

代わりにリテラルの:

(lb/publish ch "e.events" "" processed-message))) 

答えて

2

質問の後半部分では、あなたは部分的にアプリケーションを使用することができます。先端のための

(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message))) 



(.start 
    (Thread. 
    (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 
1

これは非常に大きなトピックです。この質問を複数の異なる質問に分割することを検討することもできますが、簡潔な回答はuse agentsです。以下に示すように

+0

おかげで、行います。 – neektza

関連する問題