2013-07-29 3 views
5

コーパス内のNGramの頻度を数えるプログラムを書く。どうすればよいコンジット:複数のストリームコンシューマ

tokens --- trigrams --- countFreq 

:私はちょうど1つのストリームの消費者は、ストリームソースに接続することができます現時点で

ngram :: Monad m => Int -> Conduit t m [t] 
trigrams = ngram 3 
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int) 

:私はすでにトークンのストリームを消費し、1つの注文のNGramsを生産する機能を持っています複数のストリームコンシューマを同じストリームソースに接続しますか? 私はこのような何かがしたい:

  .--- unigrams --- countFreq 
      |--- bigrams --- countFreq 
tokens ----|--- trigrams --- countFreq 
      '--- ...  --- countFreq 

Aを加えたパラレル

EDIT内の各消費者を実行するために、次のようになります。私は、このソリューション

spawnMultiple orders = do 
    chan <- atomically newBroadcastTMChan 

    results <- forM orders $ \_ -> newEmptyMVar 
    threads <- forM (zip results orders) $ 
         forkIO . uncurry (sink chan) 

    forkIO . runResourceT $ sourceFile "test.txt" 
         $$ javascriptTokenizer 
         =$ sinkTMChan chan 

    forM results readMVar 

    where 
     sink chan result n = do 
      chan' <- atomically $ dupTMChan chan 
      freqs <- runResourceT $ sourceTMChan chan' 
           $$ ngram n 
           =$ frequencies 
      putMVar result freqs 
を思い付いたペトルに
感謝
+0

'tokens'が値を返すとき、あなたの' ... grams 'はすべてそれを受け取ることを望みますか? –

答えて

5

すべてのシンクがすべての値を受け取るようにしたいと思っています。

私がお勧めしたい:

  1. 使用newBroadcastTMChanを新しいチャネルControl.Concurrent.STM.TMChan(STM-ちゃんず)を作成します。
  2. このチャネルを使用して、Data.Conduit.TMChan(stm-conduit)のsinkTBMChanをメインプロデューサに使用してシンクを構築します。
  3. 各クライアントでは、読み取り専用のコピーを作成するのにdupTMChanを使用します。 sourceTBMChanを使用してこのコピーを読む新しいスレッドを開始します。
  4. スレッドから結果を収集する。
  5. クライアントが作成したデータをできるだけ早く読み込めるようにしてください。そうしないと、ヒープオーバーフローが発生する可能性があります。

(私はそれを試していませんが、私たちはそれがどのように動作するかを知ってみましょう。)


更新:あなたは結果を収集できるか一つの方法は、各コンシューマスレッドのMVarを作成することです。完成した後、それぞれの結果はputMVarになります。そしてあなたのメインスレッドはtakeMVarであり、すべてのスレッドが終了するのを待っているので、MVarです。たとえば、varsMVarのリストである場合、メインスレッドはmapM takeMVar varsを発行してすべての結果を収集します。

+0

答えをいただきありがとうございます。forkIOでスレッドを生成するとどうすれば結果が得られますか? – SvenK

+0

@SvenK私は、結果を収集する方法のアイデアで答えを更新しました。 –

+0

TMChanには放送版があり、TBMChanでは、なぜnewBroadcastTBMChanが見つかりませんか? – CMCDragonkai

関連する問題