9

Clojureでcore.asyncコードを書きましたが、実行したときに使用可能なメモリがすべて消費され、エラーが発生しました。 core.asyncパイプラインでmapcatを使用すると、バックプレッシャーがかかってしまうようです。 (この質問の範囲を超えての理由のために残念なことですどの。)mapcatがcore.asyncのバックプレッシャーを破るときのメモリリークはどこですか?

ここmapcat INGトランスデューサの内外:x秒をカウントすることで、問題を示し、いくつかのコードです:

(ns mapcat.core 
    (:require [clojure.core.async :as async])) 

(defn test-backpressure [n length] 
    (let [message (repeat length :x) 
     input (async/chan) 
     transform (async/chan 1 (mapcat seq)) 
     output (async/chan) 
     sent (atom 0)] 
    (async/pipe input transform) 
    (async/pipe transform output) 
    (async/go 
     (dotimes [_ n] 
     (async/>! input message) 
     (swap! sent inc)) 
     (async/close! input)) 
    (async/go-loop [x 0] 
     (when (= 0 (mod x (/ (* n length) 10))) 
     (println "in:" (* @sent length) "out:" x)) 
     (when-let [_ (async/<! output)] 
     (recur (inc x)))))) 

=> (test-backpressure 1000 10) 
in: 10 out: 0 
in: 2680 out: 1000 
in: 7410 out: 2000 
in: 10000 out: 3000 ; Where are the other 7000 characters? 
in: 10000 out: 4000 
in: 10000 out: 5000 
in: 10000 out: 6000 
in: 10000 out: 7000 
in: 10000 out: 8000 
in: 10000 out: 9000 
in: 10000 out: 10000 

プロデューサーレース遠くは消費者に先んじて

私はこれを発見した最初の人ではないようです。 しかし、hereの説明はそれをカバーしていないようです。 (それは適切な回避策を提供しますが)。 概念的には、プロデューサーは先に進むと予想されますが、チャネルにバッファリングされる可能性のある少数のメッセージの長さだけです。

他のメッセージはどこにありますか? 4行目の出力では、7000 :xは未確認です。

+0

あなたが与えたリンクで、アレックスは間違った結果とバッファ制限違反のジレンマだと言いました。明らかに[ASYNC-124](http://dev.clojure.org/jira/browse/ASYNC-124)は正解を好む – Davyzhu

+0

あなたの質問に関して、他のメッセージはここで参照される](https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86)。それほど確実ではないので、もっと自信を持って答えてみましょう。 – Davyzhu

答えて

2

「メモリリークはどこですか?」という質問には2つの解釈があります。

まず、データはどこに保持されていますか?その答えは、拡大変換のすぐ下流のチャンネルバッファにあるようです。

デフォルトでは、チャネルはフルであるかどうかを知ることができますが、過度に過ぎないと判断できるFixedBufferclojure.core.async.impl.buffers/FixedBuffer)を使用します。

第2に、コードのどの部分がバッファーをいっぱいにするのでしょうか?これは、私が間違っていれば私を修正しますのManyToManyChannelclojure.core.async.impl.channels/ManyToManyChannel)どこにcalls to full?が発生する前に、バッファ上のfirst call to add!が発生しているように見えます。

take!は、削除するアイテムごとに少なくとも1つのアイテムをバッファに追加できると仮定しているようです。 mapcatのように長時間動作する拡張トランスデューサの場合、これは常に安全な前提ではありません。

this lineから(when (and (.hasNext iter) (not (impl/full? buf)))をcore.asyncのローカルコピーに変更することで、問題のコードを期待どおりに動作させることができます。 (私はこれはあなたユースケースのための堅牢なソリューションであることを保証するためにNBはcore.asyncの私の理解が不十分である。)

UPDATE 2016年9月17日:http://dev.clojure.org/jira/browse/ASYNC-178

:このための問題は、今そこにあります
関連する問題