私はcore.asyncやチャンネルなどをよりよく理解しようとしています。clojure jdbc - > async channel - > csvファイル...なぜ私は怠惰ではないのですか?
私の仕事は、データベースでjdbc select文を発行し、結果を非同期チャネルにストリーミングすることです。
clojure.data.csv
を使用して、このチャネルから排他的なスレッドを取り出し、csvファイルに書き込みたいと考えています。
以下のプログラムを実行すると、遅延が発生していないようです...端末に出力されず、すべてが一度に表示され、csvファイルには50行あります。私は誰かがなぜ私が理解するのを助けることを望んでいます。
事前のおかげで、
(ns db-async-test.core-test
(:require [clojure.java.jdbc :as j]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as async :refer [>! <! >!! <!! chan thread]]
[clojure.string :as str]
[while-let.core :refer [while-let]]))
(defn db->chan [ch {:keys [sql db-spec]} ]
"Given input channel ch, sql select, and db-spec connection info, put db
hash-maps onto ch in a separate thread. Through back pressure I'm hoping to
populate channel lazily as a consumer does downstream processing."
(println "starting fetch...")
(let [
row-count (atom 0) ; For state on rows
db-connection (j/get-connection db-spec)
statement (j/prepare-statement
db-connection
sql {
:result-type :forward-only ;; you need this to be lazy
:fetch-size 3 ;; also this
:max-rows 0
:concurrency :read-only})
row-fn (fn[d] (do
(>!! ch d)
;; everything below is just for printing to stdout and
;; trying to understand where my non-lazy bottleneck is.
(swap! row-count inc)
(when (zero? (mod @row-count 5))
(do
#_(Thread/sleep 2000)
(println "\tFetched " @row-count " rows.")
(flush)
))))]
(thread
(j/query db-connection [statement]
{:as-arrays? false
:result-set-fn vec
:row-fn row-fn
})
;; as producer we finished popluting the chan, now close in this same
;; thread.
(println "producer closing channel... (hopefully you have written rows by now...")
(async/close! ch))))
(defn chan->csv [ch csv-file ]
"With input channel ch and output file csv-file, read values off ch and write
to csv file in a separate thread."
(thread
(println "starting csv write...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false))
(when (zero? (mod @row-count 2))
(do
#_(Thread/sleep 2000)
(println "Wrote " @row-count " rows.")
(.flush writer)
(flush)))
))))
(def config {:db-spec {:classname "org.postgres.Driver"
:subprotocol "postgres"
:subname "//my-database-host:5432/mydb"
:user "me"
:password "****"}
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"})
;; main sorta thing
(do
(def ch (chan 1))
(db->chan ch config)
;; could pipeline with transducers/etc at some point.
(chan->csv ch "./test.csv"))
相続人/どのようにそれが出てきたときに説明する私のコメントといくつかの出力:
db-async-test.core-test>
;; this happens pretty quick when i run:
starting fetch...
starting csv write...
;; then it waits 30 seconds, and spits out all the output below... it's not
;; "streaming" through lazily?
Wrote 2 rows.
Fetched 5 rows.
Wrote 4 rows.
Wrote 6 rows.
Wrote 8 rows.
...clip...
Wrote 44 rows.
Wrote 46 rows.
Wrote 48 rows.
Fetched 50 rows.
producer closing channel... (hopefully you have written rows by now...
Wrote 50 rows.
what 'while-let'は何ですか?コード – mavbozo
を共有してください私は私のproject.cljからそれを引っ張った:https://github.com/markmandel/while-let – joefromct
そのdb /ドライバを使ってjdbcの奇妙なことがあるかもしれません。 'clojure.java.jdbc'はDBの違いのために動作を期待できなかった過去の問題を抱えていました。 jdbcの設定を改善するためには、レベルを下げる必要があります。関連するpostgreSQLの問題と思われるものについては、 を参照してください:https://stackoverflow.com/a/39775018/4351017 –