2017-01-13 11 views
4

postgresql-simpleは、ストリーミングクエリの機能を提供します。postgresql-simpleを使用したスト​​リーミングコンジットソースの作成

fold 
    :: (FromRow row, ToRow params) 
    => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a 

ストリーミングを最大限活用するコンジットソースを作成したいと考えています。 IOfoldに(と思う?)反変な位置に表示されますので、

mySource :: (FromRow row, Monad m) => Source m row 

残念ながら、私は本当に種類に苦しんでいます。次の型チェックでは、値を生成する前にストリーム全体を折りたたみます。

getConduit :: Connection -> IO (C.ConduitM() Event IO()) 
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo 
    where 
    foo :: C.ConduitM() Event IO() -> Event -> IO (C.ConduitM() Event IO()) 
    foo cond evt = pure (cond >> C.yield evt) 

これを実装する方法についてのあらゆる指針は非常に高く評価されます。ありがとう!最終的にするためにstm-conduitを使用

+0

、我々はこの動作を実装するために[postgresqlの-のlibpq](https://www.stackage.org/package/postgresql-libpq)にまで低下しました。私はpostgresql-simpleがこれを行うことができるとは思っていません(@ TMChanのアプローチ@Alecの言及に加えて)。 –

+0

関数 '[query']を持つ' pipes-postgresql-simple'があります(https://hackage.haskell.org/package/pipes-postgresql-simple-0.1.2.0/docs/Pipes-PostgreSQL-Simple .html#v:query)これについては、 [produceIO'](https://github.com/ocharles/pipes-postgresql-simple/blob)の実装で見ることができるように、基本的に@Alecの戦略( 'pipes-concurrency'で実装されています)を使用しています。 /master/src/Pipes/PostgreSQL/Simple.hs#L117) – Michael

答えて

5

これについて移動するための一つの(とても素敵ではない)方法でそれ

  • はちょうどこのチャンネル
  • に行をダンプするforeach_を設定した行を受け取るために新しいTMChanを作りますチャネル外発信元

私はこのオフハンドをテストする手段はありませんが、次のように動作するはずです

import Conduit 
import Database.PostgreSQL.Simple (foreach_) 
import Data.Conduit.TMChan (sourceTMChan) 
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically) 

mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row) 
mySource connection query = do 
    chan <- newTMChanIO 
    forEach_ connection query (atomically . writeTMChan chan) 
    pure (sourceTMChan chan) 

だけ私たちは、これがより簡単かもしれないforEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m()) -> m()を持っていた場合...

0

ここではそのコンパイルし、実行します上記のアレックさんの変更があります。 mkPgSourceは、彼の記事の最後にアレックが言及した一般的な機能です。永続-PostgreSQLの

import Database.PostgreSQL.Simple 
import Database.PostgreSQL.Simple.FromRow 
import Database.PostgreSQL.Simple.ToRow 
import Control.Monad.IO.Class (MonadIO) 
import Data.Conduit.TMChan (sourceTMChan) 
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, 
closeTMChan, TMChan) 
import GHC.Conc (atomically, forkIO) 
import Conduit 

--closes the channel after action is done to terminate the source 
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO()) -> IO()) -> IO (Source m r) 
mkPgSource action = do 
    chan <- newTMChanIO 
    _ <- forkIO $ do action $ atomically . (writeTMChan chan) 
       atomically $ closeTMChan chan 
    pure $ sourceTMChan chan 

sourceQuery :: (ToRow params, FromRow r, MonadIO m) => 
    Connection -> Query -> params -> IO (Source m r) 
sourceQuery conn q params = mkPgSource $ forEach conn q params 

sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO 
(Source m r) 
sourceQuery_ conn q = mkPgSource $ forEach_ conn q 
関連する問題