Clojureでcore.asyncコードを書きましたが、実行したときに使用可能なメモリがすべて消費され、エラーが発生しました。 core.asyncパイプラインでmapcat
を使用すると、バックプレッシャーがかかってしまうようです。 (この質問の範囲を超えての理由のために残念なことですどの。)mapcatがcore.asyncのバックプレッシャーを破るときのメモリリークはどこですか?
ここmapcat
INGトランスデューサの内外:x
秒をカウントすることで、問題を示し、いくつかのコードです:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
プロデューサーレース遠くは消費者に先んじて
私はこれを発見した最初の人ではないようです。 しかし、hereの説明はそれをカバーしていないようです。 (それは適切な回避策を提供しますが)。 概念的には、プロデューサーは先に進むと予想されますが、チャネルにバッファリングされる可能性のある少数のメッセージの長さだけです。
他のメッセージはどこにありますか? 4行目の出力では、7000 :x
は未確認です。
あなたが与えたリンクで、アレックスは間違った結果とバッファ制限違反のジレンマだと言いました。明らかに[ASYNC-124](http://dev.clojure.org/jira/browse/ASYNC-124)は正解を好む – Davyzhu
あなたの質問に関して、他のメッセージはここで参照される](https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86)。それほど確実ではないので、もっと自信を持って答えてみましょう。 – Davyzhu