2016-08-20 30 views
7

私は仕事の集まりを含む1つのrefを共有する100人の労働者(エージェント)を抱えています。このコレクションは、タスクを持っていますが、各ワーカーは(dosyncブロックで)このコレクション、それを印刷し、時には(dosyncブロックで)コレクションに戻ってそれを入れてから、一つのタスクを取得:clojure refの奇妙な振る舞い

(defn have-tasks? 
    [tasks] 
    (not (empty? @tasks))) 

(defn get-task 
    [tasks] 
    (dosync 
    (let [task (first @tasks)] 
     (alter tasks rest) 
     task))) 

(defn put-task 
    [tasks task] 
    (dosync (alter tasks conj task)) 
    nil) 

(defn worker 
    [& {:keys [tasks]}] 
    (agent {:tasks tasks})) 

(defn worker-loop 
    [{:keys [tasks] :as state}] 
    (while (have-tasks? tasks) 
    (let [task (get-task tasks)] 
     (println "Task: " task) 
     (when (< (rand) 0.1) 
     (put-task tasks task)))) 
    state) 

(defn create-workers 
    [count & options] 
    (->> (range 0 count) 
     (map (fn [_] (apply worker options))) 
     (into []))) 

(defn start-workers 
    [workers] 
    (doseq [worker workers] (send-off worker worker-loop))) 

(def tasks (ref (range 1 10000000))) 

(def workers (create-workers 100 :tasks tasks)) 

(start-workers workers) 
(apply await workers) 

私はこのコードを実行し、エージェントによって印刷された最後の値は(複数回試行した後): 435445, 4556294 1322061 3950017です。 しかし、決して9999999私は期待しています。 そして最後にコレクションが本当に空です。 私は間違っていますか?

編集:私はできるだけ単純労働者ループ書き直し

(defn worker-loop 
    [{:keys [tasks] :as state}] 
    (loop [] 
    (when-let [task (get-task tasks)] 
     (println "Task: " task) 
     (recur))) 
    state) 

をしかし、問題はまだそこにあります。 このコードは、唯一のワーカーを作成するときに期待どおりに動作します。

+0

'println'はスレッドセーフですか? –

+0

@ShannonSeveranceいいえ、読みやすいように_e.g._ like '(locking:out(println" ... "))'を使う必要があります。 –

答えて

4

問題は、エージェントと怠惰で行うにはほとんど何でもとは何の関係もありません。 rは確実にないのに対し、

(defn f [init] 
    (let [state (ref init) 
     task (fn [] 
       (loop [last-n nil] 
       (if-let [n (dosync 
           (let [n (first @state)] 
           (alter state rest) 
           n))] 
        (recur n) 
        (locking :out 
        (println "Last seen:" last-n))))) 
     workers (->> (range 0 5) 
        (mapv (fn [_] (Thread. task))))] 
    (doseq [w workers] (.start w)) 
    (doseq [w workers] (.join w)))) 

(defn r [] 
    (f (range 1 100000))) 

(defn i [] (f (->> (iterate inc 1) 
        (take 100000)))) 

(defn t [] 
    (f (->> (range 1 100000) 
      (take Integer/MAX_VALUE)))) 

このコードを実行すると、両方のit、両方の怠惰な、確実に動作することを示しています。ここではまだ問題を示す元のコードのやや縮小版です。問題は実際にはrange呼び出しによって返されたクラスの同時性バグです。実際、そのバグはthis Clojure ticketに記載されており、Clojureバージョン1.9.0-alpha11から修正されています。

チケットが何らかの理由でアクセスできない場合には、バグの簡単な要約:rangeの結果にrestコールの内部では、競合状態のための小さなチャンスがあった:「flag」それは言います「次の値はすでに計算されています」はset before the actual value itselfであり、「次の値」がまだnilであっても、2番目のスレッドがそのフラグを真と見なすことができました。 alterを呼び出すと、refにその値nilが設定されます。これはswapping the two assignment linesで修正されています。

rangeの結果が1つのスレッドで強制的に実現された場合や、別の遅延セーブでラップされた場合、そのバグは表示されません。

1

範囲内の最後の数値に達すると、従業員がまだ古い数値を保持しています。これらのうちのいくつかはキューに戻され、再度処理されます。

(defn worker-loop 
    [{:keys [tasks] :as state}] 
    (loop [last-task nil] 
    (if (have-tasks? tasks) 
     (let [task (get-task tasks)] 
     ;; (when (< (rand) 0.1) 
     ;; (put-task tasks task) 
     (recur task)) 
     (when last-task 
     (println "Last task:" last-task)))) 
    state) 

これは、タスクが頻繁にhave-tasks?によって見られたコード、の競合状態を示しています。各ワーカーによって処理最後のタスクを印刷するためのより良い何が起こっているかを確認するため、あなたがworker-loopを変更することができて

get-taskがタスクの処理の終わり近くに呼び出されたときに他の人が取ったものです。

競合状態は、have-tasks?を削除し、代わりにget-taskからnilの戻り値を使用して、それ以上のタスクが利用できないことを示す信号として解決できます。更新

を観察されたように、このレースの条件は、問題を説明していません。

どちらもこのようget-taskで可能な競合状態を除去することで解決される問題である:私は尋ねた

(defn get-task [tasks] 
    (locking :lock 
    (dosync 
     (let [task (first @tasks)] 
     (alter tasks rest) 
     task)))) 
+2

私はこれが理由だとは思わない。私は '(いつ(<(rand)... 'の式をコメントアウトして、キューに戻ってきたタスクを返さず、まだ部分だけを処理しています。停止前に最後に印刷するタスク番号はキュー全体の半分でもないので、理論は実際には意味をなさない。今日これを見て、私が答えを見つけたり、誰かができることを期待している。質問 – Josh

+0

はい、あなたは私のコードの競合状態について正しいです、おかげで私は、できるだけ簡単な私のコードを書き直したが、問題はまだそこにある –

3

(defn get-task [tasks] 
    (dosync 
    (first (alter tasks rest)))) 

しかし、明示的なロックを使用するようにget-taskを変更する問題を解決するようですこのquestionClojure Google Groupにあり、それは私が答えを見つけるのを助けました。

問題は、STMトランザクション内でレイジーシーケンスを使用していたことです。このことにより、

(def tasks (ref (range 1 10000000))) 

:予想通り、それは働いていた

(def tasks (ref (into [] (range 1 10000000)))) 

私はこのコードを置き換え

!問題が発生した私の生産コードで

は、私も自分の例のように、タプルの怠惰なコレクションを返すコルマフレームワークを使用していました。

結論:STMトランザクション内の怠惰なデータ構造の使用は避けてください。ここ

+0

、あなたの状態のために一つだけのREFを使用していたよう:。。。あなたは、ANを使用してみました同じ結果を与えるようだが、実行時に大きな(_ca_ 10 x)の削減をもたらすだろう –

+0

はい、私はそれについて考えましたが、調整されたアトミック関数 "get-taskを書く方法はわかりません"この場合、スワップ機能にどのような機能を渡すべきですか? –

+1

'(defn get-taks [tasks](let [my-tasks @tasks](if(compare-and-set! (最初のmy-tasks)(最初のmy-tasks)(再帰タスク)))) ')、またはダミー0でタスクを開始し、'(defn get-tasks [tasks]残り))) ' –