2013-06-01 16 views
8

このメモの長さは、事前にお詫び申し上げます。私はそれを短くするためにかなりの時間を費やしましたが、これは私が得ることができるほど小さかったです。rxjavaとclojure非同期ミステリー:未来の約束と代理人、oh my

私は謎があり、あなたの助けに感謝します。この謎は、私がオンラインサンプルから手に入れた2つの簡単なobservableを介してClojureに書いたrxjava observerの動作に由来します。

オブザーバブルオブザーバは、オブザーバのハンドラに同期してメッセージを送信します。私の考えでは、原理上のオブザーバは期待通りに動作します。

他のスレッドは、Clojure futureを使用して、他のスレッドでも同じことを非同期的に実行します。まったく同じオブザーバーは、onNextに投稿されたすべてのイベントをキャプチャしません。それはちょうど末尾にメッセージの乱数を失うようです。

promise D onCompleted待ちの有効期限とagentコレクタに送信されたすべてのイベント待ちの有効期限の間、以下では意図的なレースがあります。 promiseが勝った場合、onCompletedの場合はfalseagentの場合は短いキューが表示されます。 agentが勝った場合、onCompletedの場合はtrueagentのキューのすべてのメッセージが表示されます。私が期待していない結果の1つは、の場合はtrueagentの場合は短いキューです。しかし、マーフィーは眠らないし、それはまさに私が見るものです。私はガーベジコレクションが間違っているのか、ClojureのSTMへの内部キューイングか、私のばかげているのか、それとも全く別なものかはわかりません。

ソースは、自己完結型の順番で表示されていますので、lein replで直接実行できます。ネットフリックスのrxjavaの0.9.0バージョンへの依存を宣言まず、Leiningenをプロジェクトファイル、project.clj、::今すぐ

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

、名前空間とClojureの要件とJavaの輸入三方から抜け出すためにcermonialsがあります。 :

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

最後に、コンソールに出力するマクロ:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

最後に、私の観察者。私はagentを使用して、観察可能なもののいずれかによって送信されたメッセージを収集します。onNext私は可能性を収集するために​​を使用しますonError。観察者の外部の消費者がそれを待つことができるように、私はをonCompletedに使用します。

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

ここでは同期観測があります。それはオブザーバーのonNextの喉の下に25個のメッセージを送り、次にonCompletedと呼んでいます。

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

私たちは、この観測可能に私たちのオブザーバーを購読:期待どおりに動作します

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

、そして私たちは、コンソール上で次の結果を参照してくださいここで

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

が行う非同期観測可能ですまったく同じこと、futureのスレッド:

しかし、驚いたことに、ここでコンソールに表示されているのはtrueonCompletedです。これは、promiseがタイムアウトしなかったことを意味します。非同期メッセージの一部のみが表示されます。表示されるメッセージの実際の数は、実行ごとに異なります。つまり、いくつかの同時実行現象が存在することを意味します。手がかりが高く評価されました。

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

答えて

7

は、エージェント上のawait-forブロックすべてのアクションは、このように派遣されるまで、現在のスレッドを意味 遠くそれはあなたのawait後のことが起こり得ることを意味を発生しているエージェントに、(このスレッドまたはエージェントから)エージェントにメッセージを送信できるスレッドがまだあり、それがあなたのケースで起こっていることです。代理店での待っているあなたの待っていると、あなたはその値を地図内の:onNextキーでderefを持っているし、待ってから完了した約束を待つが、その間に他のいくつかのメッセージがベクター中に収集されるべき薬剤を含む。

これは、基本的には完了を待つことを意味するキーを地図の最初のキーにすることで解決できます。すでにonCompletedを受け取っています。

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

確認済みでテスト済みです。 –