編集:あなたが尋ねていたことを誤解しました。 あなたはこれをパイプの中で行うことができるかもしれませんが、動機づけが何であるかはわかりません。私は再利用可能なパイプチェーンを構築することをお勧めしたいと思います。パイプ自体に組み込むと、最初のインが最初のアウトであることを保証する注文保証は失われます。
Work Stealingのセクションはあなたが探しているものですが、このコードは基本的にチュートリアルとはまったく同じですが、どのように動作するかを解説しましょう。
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO()
a = each [1..10]
b :: Pipe Int Int IO()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO()
c = do
x <- await
lift $ print x
c
main :: IO()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
spawn unbounded
がPipes.Concurrentからである最初の行は、それが入力と出力のハンドルを持って「メールボックス」を初期化します。ここでは、我々はあなたがやりたいことができ一つの方法です。最初は私を混乱させましたが、この場合はメッセージを出力に送り、入力からそれらを引き出します。これはgolangのような言語のプッシュプルメッセージチャネルに似ています。
Bufferと指定して、保存できるメッセージの数を指定します。この場合、無制限で無制限を設定します。
メールボックスが初期化されるように、メッセージを送信するEffect
を作成できるようになりました。メールボックスチャネルはSTMを使用して実装されているので、メッセージを非同期で収集する方法です。
メールボックスにフィードする非同期ジョブを作成しましょう。
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
a >-> toOutput output
は普通のパイプ組成物である、我々はパイプに出力を変換するtoOutput
を必要としています。 IOの一部でもあるperformGC
コールに注意してください。これにより、Pipes.Concurrentは、ジョブの完了後にクリーンアップを行うことができます。私たちが好きな場合はforkIO
を使ってこれを実行できますが、この場合は後で結果が終わるのを待つことができるようにasync
を使います。さて、私たちのメールボックスは非同期にメッセージを受信する必要がありますので、メッセージを受信して、それらを取り出していくつかの作業をしましょう。
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
これまでの考え方と同じですが、今回はちょうどその一部を産んでいます。私たちは通常のパイプのように入力から読み取ってfromInput
を使用してから、チェーンの残りの部分を実行して、完了したらクリーンアップします。 input
は、1人の作業者だけが値を受け取るたびに値が引き出されることを保証します。 output
に入るすべてのジョブが完了すると(開いているすべてのジョブが追跡されます)、input
パイプが閉じられ、作業者が終了します。
ウェブワーカーのシナリオでこれを使用している場合は、toOutput output
チャンネルにリクエストを送信し続け、好きなだけ多くのワーカーを投稿して、fromInput input
からパイプラインにプルするメインループがあります。
https://hackage.haskell.org/package/pipes-async-0.1.1/docs/src/Pipes-Async.html#bufferは、複数のパイプをTQueueにフィードするように調整できるようですが、そこで '[Pipe ab] - > Pipe ab'を取得します。 1時間前に[機能リクエスト](https://github.com/jwiegley/pipes-async/issues/1)を投稿しました>:D – Gurkenglas