2017-11-08 12 views
1

私はcore.asyncやチャンネルなどをよりよく理解しようとしています。clojure jdbc - > async channel - > csvファイル...なぜ私は怠惰ではないのですか?

私の仕事は、データベースでjdbc select文を発行し、結果を非同期チャネルにストリーミングすることです。

clojure.data.csvを使用して、このチャネルから排他的なスレッドを取り出し、csvファイルに書き込みたいと考えています。

以下のプログラムを実行すると、遅延が発生していないようです...端末に出力されず、すべてが一度に表示され、csvファイルには50行あります。私は誰かがなぜ私が理解するのを助けることを望んでいます。

事前のおかげで、

(ns db-async-test.core-test 
    (:require [clojure.java.jdbc :as j] 
      [clojure.java.io :as io] 
      [clojure.data.csv :as csv] 
      [clojure.core.async :as async :refer [>! <! >!! <!! chan thread]] 
      [clojure.string  :as str] 
      [while-let.core  :refer [while-let]])) 


(defn db->chan [ch {:keys [sql db-spec]} ] 
    "Given input channel ch, sql select, and db-spec connection info, put db 
    hash-maps onto ch in a separate thread. Through back pressure I'm hoping to 
    populate channel lazily as a consumer does downstream processing." 
    (println "starting fetch...") 
    (let [ 
     row-count   (atom 0) ; For state on rows 
     db-connection  (j/get-connection db-spec) 
     statement (j/prepare-statement 
        db-connection 
        sql { 
         :result-type :forward-only ;; you need this to be lazy 
         :fetch-size 3    ;; also this 
         :max-rows 0 
         :concurrency :read-only}) 
     row-fn (fn[d] (do 
         (>!! ch d) 
         ;; everything below is just for printing to stdout and 
         ;; trying to understand where my non-lazy bottleneck is. 
         (swap! row-count inc) 
         (when (zero? (mod @row-count 5)) 
         (do 
          #_(Thread/sleep 2000) 
          (println "\tFetched " @row-count " rows.") 
          (flush) 
          ))))] 
    (thread 
     (j/query db-connection [statement] 
       {:as-arrays? false 
       :result-set-fn vec 
       :row-fn row-fn 
       }) 
     ;; as producer we finished popluting the chan, now close in this same 
     ;; thread. 
     (println "producer closing channel... (hopefully you have written rows by now...") 
     (async/close! ch)))) 


(defn chan->csv [ch csv-file ] 
    "With input channel ch and output file csv-file, read values off ch and write 
    to csv file in a separate thread." 
    (thread 
    (println "starting csv write...") 
    (def row-count (atom 0)) 
    (with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")] 
     (while-let [data (<!! ch)] 
     (swap! row-count inc) 
     (csv/write-csv writer [data] :quote? (fn[x] false)) 
     (when (zero? (mod @row-count 2)) 
      (do 
      #_(Thread/sleep 2000) 
      (println "Wrote " @row-count " rows.") 
      (.flush writer) 
      (flush))) 
     )))) 

(def config {:db-spec {:classname "org.postgres.Driver" 
         :subprotocol "postgres" 
         :subname "//my-database-host:5432/mydb" 
         :user "me" 
         :password "****"} 
      :sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"}) 

;; main sorta thing 
(do 
    (def ch (chan 1)) 
    (db->chan ch config) 
    ;; could pipeline with transducers/etc at some point. 
    (chan->csv ch "./test.csv")) 

相続人/どのようにそれが出てきたときに説明する私のコメントといくつかの出力:

db-async-test.core-test> 
;; this happens pretty quick when i run: 
starting fetch... 
starting csv write... 

;; then it waits 30 seconds, and spits out all the output below... it's not 
;; "streaming" through lazily? 
Wrote 2 rows. 
    Fetched 5 rows. 
Wrote 4 rows. 
Wrote 6 rows. 
Wrote 8 rows. 
...clip... 
Wrote 44 rows. 
Wrote 46 rows. 
Wrote 48 rows. 
    Fetched 50 rows. 
producer closing channel... (hopefully you have written rows by now... 
Wrote 50 rows. 
+0

what 'while-let'は何ですか?コード – mavbozo

+0

を共有してください私は私のproject.cljからそれを引っ張った:https://github.com/markmandel/while-let – joefromct

+0

そのdb /ドライバを使ってjdbcの奇妙なことがあるかもしれません。 'clojure.java.jdbc'はDBの違いのために動作を期待できなかった過去の問題を抱えていました。 jdbcの設定を改善するためには、レベルを下げる必要があります。関連するpostgreSQLの問題と思われるものについては、 を参照してください:https://stackoverflow.com/a/39775018/4351017 –

答えて

1

を[OK]を、私は私が私のために働く何かを持っていると思います。

私が持っていた主な修正はorg.clojure.java/jdbcをスワップアウトしたと は私のproject.cljfuncool/clojure.jdbcとそれを置き換えます。

funcool/clojure.jdbcは、result-set->lazy-seqにアクセスしています。

ns

以下
(ns db-async-test.core-test 
    (:require [jdbc.core :as j] 
      [while-let.core :refer [while-let]] 
      [clojure.java.io :as io] 
      [clojure.data.csv :as csv] 
      [clojure.core.async :as a :refer [>!! <!! chan thread]] 
      [clojure.string :as str])) 

はreleventコードです。私はこのことを非同期チャネルでストリームすることができることに基づいています。私はこれを使って非同期減速器/トランスデューサで遊ぶことができるはずです。リーダー・スレッドの

機能:ライター・スレッドの

(defn db->chan [ch {:keys [sql db-spec]} ] 
    "Put db hash-maps onto ch." 
    (println "starting reader thread...") 
    (let [ 
     row-count   (atom 0) ; For state on rows 
     row-fn (fn[r] (do (>!! ch r) 
         ;; everything below is just for printing to stdout 
         (swap! row-count inc) 
         (when (zero? (mod @row-count 100)) 
          (println "Fetched " @row-count " rows."))))] 
    (with-open [conn (j/connection db-spec)] 
     (j/atomic conn 
       (with-open [cursor (j/fetch-lazy conn sql)] 
        (doseq [row (j/cursor->lazyseq cursor)] 
        (row-fn row))))) 
     (a/close! ch))) 

が機能:

(def config {:db-spec {:subprotocol "postgresql" 
         :subname "//mydbhost:5432/mydb" 
         :user "me" 
         :password "*****"} 
      :sql "select row_id, giant_xml_value::text from some_table"}) 

(do 
    (def ch (chan 1)) 
    (thread (db->chan ch config)) 
    (thread (chan->csv ch "./test.csv"))) 

(defn chan->csv [ch csv-file ] 
    "Read values off ch and write to csv file." 
    (println "starting writer thread...") 
    (def row-count (atom 0)) 
    (with-open [^java.io.Writer writer (io/writer csv-file 
             :append false :encoding "UTF-8")] 
    (while-let [data (<!! ch)] 
     (swap! row-count inc) 
     (csv/write-csv writer [data] :quote? (fn[x] false)) 
     (when (zero? (mod @row-count 100)) 
     (println "Wrote " @row-count " rows."))))) 

私は以下ではなく、個々の機能でthread部品を置きます

以下の出力は、両方のスレッドが同時に動作しているように見え、チャネルにデータをストリーミングし、そのチャネルからcsvにポップします。
giant_xml_columnでも、私のシステムはまだ膨大な量のメモリを使用していません。

starting fetch... 
starting csv write... 
Fetched 100 Wrote rows. 
100 
    rows. 
Fetched 200Wrote rows.200 

rows. 
Fetched 300 rows. 
Wrote 
...clip.... 
6000 rows. 
Fetched 6100 rows. 
Wrote 6100 rows. 
Fetched 6200 rows. 
Wrote 6200 rows. 
Fetched 6300 rows. 
Wrote 
6300 rows. 
Fetched 6400Wrote rows.6400 

rows. 
Fetched 6500 rows.Wrote 

6500 rows. 
関連する問題