2011-03-06 10 views
4

私はHaskellスレッドを使用していますが、チャネル全体でレイジー評価された値を伝達するという問題に取り組んでいます。たとえば、N個のワーカースレッドと1個の出力スレッドでは、ワーカーは未評価の作業を通信し、出力スレッドはそれらの作業を終了します。Haskellでの同時チャネルの厳密な評価手法

さまざまなドキュメントでこの問題について読んだことがありますが、さまざまな解決策がありましたが、私は1つの解決策しか見つけられず、残りの解決策は見つからなかった。以下は、ワーカースレッドが長い時間がかかる計算を開始するコードです。降順でスレッドを開始するので、最初のスレッドは最長で、後のスレッドは先に終了する必要があります。

import Control.Concurrent (forkIO) 
import Control.Concurrent.Chan -- .Strict 
import Control.Concurrent.MVar 
import Control.Exception (finally, evaluate) 
import Control.Monad (forM_) 
import Control.Parallel.Strategies (using, rdeepseq) 

main = (>>=) newChan $ (>>=) (newMVar []) . run 

run :: Chan (Maybe String) -> MVar [MVar()] -> IO() 
run logCh statVars = do 
    logV <- spawn1 readWriteLoop 
    say "START" 
    forM_ [18,17..10] $ spawn . busyWork 
    await 
    writeChan logCh Nothing -- poison the logger 
    takeMVar logV 
    putStrLn "DONE" 
    where 
    say mesg = force mesg >>= writeChan logCh . Just 

    force s = mapM evaluate s -- works 
-- force s = return $ s `using` rdeepseq -- no difference 
-- force s = return s -- no-op; try this with strict channel 

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen 
    embiggen i = i*i*i*i*i 

    readWriteLoop = readChan logCh >>= writeReadLoop 
    writeReadLoop Nothing = return() 
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop 

    spawn1 action = do 
     v <- newEmptyMVar 
     forkIO $ action `finally` putMVar v() 
     return v 

    spawn action = do 
     v <- spawn1 action 
     modifyMVar statVars $ \vs -> return (v:vs,()) 

    await = do 
     vs <- modifyMVar statVars $ \vs -> return ([], vs) 
     mapM_ takeMVar vs 

ほとんどの手法を使用して、結果は生成された順序で報告されます。すなわち、最も長く実行されている計算が最初に行われます。私は、出力スレッドがすべての作業を行っていることを意味するためにこれを解釈:

-- results in order spawned (longest-running first = broken) 
START 
892616806655 
503999185040 
274877906943 
144162977343 
72313663743 
34464808608 
15479341055 
6484436675 
2499999999 
DONE 

私はこの答えは、厳しいチャンネル、だろうと思ったが、彼らは動作しませんでした。文字列のWHNFは、最外のコンストラクタ(文字列の最初の文字のnilまたはcons)を強制するだけで十分ではないことを理解しています。 rdeepseqは完全に評価されるはずですが、違いはありません。私が見つけた唯一のことは、文字列内のすべての文字にControl.Exception.evaluate :: a -> IO aをマップすることです。 (いくつかの異なる選択肢のコードでforce機能のコメントを参照してください)ここでの結果はControl.Exception.evaluateである:

-- results in order finished (shortest-running first = correct) 
START 
2499999999 
6484436675 
15479341055 
34464808608 
72313663743 
144162977343 
274877906943 
503999185040 
892616806655 
DONE 

なぜ厳格なチャネルまたはrdeepseqは、この結果を生成しませんか?他のテクニックはありますか?なぜ私は最初の結果が壊れているのか誤解していますか?

答えて

5

ここには2つの問題があります。

(明示的rnfを使用して)最初の試みがうまくいかない理由はreturnを使用することによって、あなたはそれが評価されたとき、完全に自分自身を評価し、サンクが、されていませんサンク自体を作成した、ということです評価される。それはIOに値を返すという事実はevaluateは順序を課すことができることを意味します:

return (error "foo") >> return 1 == return 1 
evaluate (error "foo") >> return 1 == error "foo" 

結論は、このコードということである:評価の種類がa -> IO aであることに注意してください

force s = evaluate $ s `using` rdeepseq 

が動作します(と)は、mapM_ evaluate sと同じ動作をします。


厳密なチャネルを使用する場合は、ややこしいですが、これは厳密な並行性のバグが原因であると考えられます。高価な計算は実際にワーカースレッド上で実行されていますが、あまり効果がありません(文字列に非同期例外をいくつか隠し、例外が発生したスレッドを確認することでこれを明示的に確認できます)。

バグとは何ですか?のは厳しいwriteChanのコードを見てみましょう:

writeChan :: NFData a => Chan a -> a -> IO() 
writeChan (Chan _read write) val = do 
    new_hole <- newEmptyMVar 
    modifyMVar_ write $ \old_hole -> do 
    putMVar old_hole $! ChItem val new_hole 
    return new_hole 

私たちは、サンクを評価する前にmodifyMVar_writeに呼び出されたことがわかります。一連の操作は次のようになります。

  1. writeChanは、我々は上に高価なサンクを置く高価サンク
  2. を評価我々はtakeMVar write
  3. (チャンネルに書きたい誰をブロックする)
  4. を入力します他のすべてのスレッドをブロック解除たちputMVar writeチャネル

ロックが取得される前に評価を実行するため、evaluateバリアントではこの動作が表示されません。

私はこれについてDon mailを送って、彼がこの行動が一種のものではないと同意するかどうかを確認します。

Donは、この動作が準最適であることに同意します。私たちはパッチに取り組んでいます。

+0

2番目の部分は非常に興味深いです(まだ1番目を熟考しています)。だから、厳密なチャンネルはスレッドを送信すると評価されますが、最初にチャンネルの*予約*をしますか?作業者は望む順番で作業を終えることができますが、チャンネルはFCFSを(FFFS =最初の作業ではなく、最初の作業ではなく)シリアル化します。ありがとう。 – chrisleague

+0

予約はお勧めです。 –

+0

補足として、リストのすべての要素の評価を強制するdeepseqは必ずしも必要ではありません。 'length xs 'seq' xs'で行うことができるリストの背骨だけを評価するのに十分なことがよくあります – sclv