私はcore.asyncやチャンネルなどをよりよく理解しようとしています。clojure jdbc - > async channel - > csvファイル...なぜ私は怠惰ではないのですか?
私の仕事は、データベースでjdbc select文を発行し、結果を非同期チャネルにストリーミングすることです。
(ns db-async-test.core-test
(:require [clojure.java.jdbc :as j]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as async :refer [>! <! >!! <!! chan thread]]
[clojure.string :as str]
[while-let.core :refer [while-let]]))
(defn db->chan [ch {:keys [sql db-spec]} ]
"Given input channel ch, sql select, and db-spec connection info, put db
hash-maps onto ch in a separate thread. Through back pressure I'm hoping to
populate channel lazily as a consumer does downstream processing."
(println "starting fetch...")
(let [
row-count (atom 0) ; For state on rows
db-connection (j/get-connection db-spec)
statement (j/prepare-statement
sql {
:result-type :forward-only ;; you need this to be lazy
:fetch-size 3 ;; also this
:max-rows 0
:concurrency :read-only})
row-fn (fn[d] (do
(>!! ch d)
;; everything below is just for printing to stdout and
;; trying to understand where my non-lazy bottleneck is.
(swap! row-count inc)
(when (zero? (mod @row-count 5))
#_(Thread/sleep 2000)
(println "\tFetched " @row-count " rows.")
(j/query db-connection [statement]
{:as-arrays? false
:result-set-fn vec
:row-fn row-fn
;; as producer we finished popluting the chan, now close in this same
;; thread.
(println "producer closing channel... (hopefully you have written rows by now...")
(async/close! ch))))
(defn chan->csv [ch csv-file ]
"With input channel ch and output file csv-file, read values off ch and write
to csv file in a separate thread."
(println "starting csv write...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false))
(when (zero? (mod @row-count 2))
#_(Thread/sleep 2000)
(println "Wrote " @row-count " rows.")
(.flush writer)
(def config {:db-spec {:classname "org.postgres.Driver"
:subprotocol "postgres"
:subname "//my-database-host:5432/mydb"
:user "me"
:password "****"}
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"})
;; main sorta thing
(def ch (chan 1))
(db->chan ch config)
;; could pipeline with transducers/etc at some point.
(chan->csv ch "./test.csv"))
;; this happens pretty quick when i run:
starting fetch...
starting csv write...
;; then it waits 30 seconds, and spits out all the output below... it's not
;; "streaming" through lazily?
Wrote 2 rows.
Fetched 5 rows.
Wrote 4 rows.
Wrote 6 rows.
Wrote 8 rows.
Wrote 44 rows.
Wrote 46 rows.
Wrote 48 rows.
Fetched 50 rows.
producer closing channel... (hopefully you have written rows by now...
Wrote 50 rows.
what 'while-let'は何ですか?コード – mavbozo
を共有してください私は私のproject.cljからそれを引っ張った:https://github.com/markmandel/while-let – joefromct
そのdb /ドライバを使ってjdbcの奇妙なことがあるかもしれません。 'clojure.java.jdbc'はDBの違いのために動作を期待できなかった過去の問題を抱えていました。 jdbcの設定を改善するためには、レベルを下げる必要があります。関連するpostgreSQLの問題と思われるものについては、 を参照してください:https://stackoverflow.com/a/39775018/4351017 –