このメモの長さは、事前にお詫び申し上げます。私はそれを短くするためにかなりの時間を費やしましたが、これは私が得ることができるほど小さかったです。rxjavaとclojure非同期ミステリー:未来の約束と代理人、oh my
私は謎があり、あなたの助けに感謝します。この謎は、私がオンラインサンプルから手に入れた2つの簡単なobservable
を介してClojureに書いたrxjava observer
の動作に由来します。
オブザーバブルオブザーバは、オブザーバのハンドラに同期してメッセージを送信します。私の考えでは、原理上のオブザーバは期待通りに動作します。
他のスレッドは、Clojure future
を使用して、他のスレッドでも同じことを非同期的に実行します。まったく同じオブザーバーは、onNext
に投稿されたすべてのイベントをキャプチャしません。それはちょうど末尾にメッセージの乱数を失うようです。
promise
D onCompleted
待ちの有効期限とagent
コレクタに送信されたすべてのイベント待ちの有効期限の間、以下では意図的なレースがあります。 promise
が勝った場合、onCompleted
の場合はfalse
、agent
の場合は短いキューが表示されます。 agent
が勝った場合、onCompleted
の場合はtrue
、agent
のキューのすべてのメッセージが表示されます。私が期待していない結果の1つは、の場合はtrue
、agent
の場合は短いキューです。しかし、マーフィーは眠らないし、それはまさに私が見るものです。私はガーベジコレクションが間違っているのか、ClojureのSTMへの内部キューイングか、私のばかげているのか、それとも全く別なものかはわかりません。
ソースは、自己完結型の順番で表示されていますので、lein repl
で直接実行できます。ネットフリックスのrxjavaの0.9.0
バージョンへの依存を宣言まず、Leiningenをプロジェクトファイル、project.clj
、::今すぐ
(defproject expt2 "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[com.netflix.rxjava/rxjava-clojure "0.9.0"]]
:main expt2.core)
、名前空間とClojureの要件とJavaの輸入三方から抜け出すためにcermonialsがあります。 :
(ns expt2.core
(:require clojure.pprint)
(:refer-clojure :exclude [distinct])
(:import [rx Observable subscriptions.Subscriptions]))
最後に、コンソールに出力するマクロ:
(defmacro pdump [x]
`(let [x# ~x]
(do (println "----------------")
(clojure.pprint/pprint '~x)
(println "~~>")
(clojure.pprint/pprint x#)
(println "----------------")
x#)))
最後に、私の観察者。私はagent
を使用して、観察可能なもののいずれかによって送信されたメッセージを収集します。onNext
私は可能性を収集するためにを使用しますonError
。観察者の外部の消費者がそれを待つことができるように、私はをonCompleted
に使用します。
(defn- subscribe-collectors [obl]
(let [;; Keep a sequence of all values sent:
onNextCollector (agent [])
;; Only need one value if the observable errors out:
onErrorCollector (atom nil)
;; Use a promise for 'completed' so we can wait for it on
;; another thread:
onCompletedCollector (promise)]
(letfn [;; When observable sends a value, relay it to our agent"
(collect-next [item] (send onNextCollector (fn [state] (conj state item))))
;; If observable errors out, just set our exception;
(collect-error [excp] (reset! onErrorCollector excp))
;; When observable completes, deliver on the promise:
(collect-completed [ ] (deliver onCompletedCollector true))
;; In all cases, report out the back end with this:
(report-collectors [ ]
(pdump
;; Wait for everything that has been sent to the agent
;; to drain (presumably internal message queues):
{:onNext (do (await-for 1000 onNextCollector)
;; Then produce the results:
@onNextCollector)
;; If we ever saw an error, here it is:
:onError @onErrorCollector
;; Wait at most 1 second for the promise to complete;
;; if it does not complete, then produce 'false'.
;; I expect if this times out before the agent
;; times out to see an 'onCompleted' of 'false'.
:onCompleted (deref onCompletedCollector 1000 false)
}))]
;; Recognize that the observable 'obl' may run on another thread:
(-> obl
(.subscribe collect-next collect-error collect-completed))
;; Therefore, produce results that wait, with timeouts, on both
;; the completion event and on the draining of the (presumed)
;; message queue to the agent.
(report-collectors))))
ここでは同期観測があります。それはオブザーバーのonNext
の喉の下に25個のメッセージを送り、次にonCompleted
と呼んでいます。
(defn- customObservableBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method.
;; Send 25 strings to the observer's onNext:
(doseq [x (range 25)]
(-> observer (.onNext (str "SynchedValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted)
; return a NoOpSubsription since this blocks and thus
; can't be unsubscribed (disposed):
(Subscriptions/empty))))
私たちは、この観測可能に私たちのオブザーバーを購読:期待どおりに動作します
;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
(subscribe-collectors))
、そして私たちは、コンソール上で次の結果を参照してくださいここで
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["SynchedValue_0"
"SynchedValue_1"
"SynchedValue_2"
"SynchedValue_3"
"SynchedValue_4"
"SynchedValue_5"
"SynchedValue_6"
"SynchedValue_7"
"SynchedValue_8"
"SynchedValue_9"
"SynchedValue_10"
"SynchedValue_11"
"SynchedValue_12"
"SynchedValue_13"
"SynchedValue_14"
"SynchedValue_15"
"SynchedValue_16"
"SynchedValue_17"
"SynchedValue_18"
"SynchedValue_19"
"SynchedValue_20"
"SynchedValue_21"
"SynchedValue_22"
"SynchedValue_23"
"SynchedValue_24"],
:onError nil,
:onCompleted true}
----------------
が行う非同期観測可能ですまったく同じこと、future
のスレッド:
しかし、驚いたことに、ここでコンソールに表示されているのはtrue
onCompleted
です。これは、promise
がタイムアウトしなかったことを意味します。非同期メッセージの一部のみが表示されます。表示されるメッセージの実際の数は、実行ごとに異なります。つまり、いくつかの同時実行現象が存在することを意味します。手がかりが高く評価されました。
----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["AsynchValue_0"
"AsynchValue_1"
"AsynchValue_2"
"AsynchValue_3"
"AsynchValue_4"
"AsynchValue_5"
"AsynchValue_6"],
:onError nil,
:onCompleted true}
----------------
確認済みでテスト済みです。 –