A short note about using MVars in Haskell. Source is here: https://github.com/carlohamalainen/playground/tree/master/haskell/mvar.
Unlike earlier blog posts, this one should be built using Stack. Something like:
git clone https://github.com/carlohamalainen/playground.git
cd playground/haskell/mvar
stack build
Then use stack ghci
instead of cabal repl
. The main executable is .stack-work/dist/x86_64-linux/Cabal-1.22.4.0/build/mvar-exe/mvar-exe
.
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import Control.Concurrent
import Control.Monad
import Control.Concurrent.ParallelIO.Local
import System.IO
Here is the situation: we have a function that makes a call to some
restricted resource, say a network API, and we would like calls to this
API from our application to be serialized across multiple threads.
For the purposes of this blog post, here is a dummy function that
sleeps a bit and returns x + 1
. Pretend that it’s calling
a magical API on the network somewhere.
getExpensiveThing :: Int -> IO Int
getExpensiveThing x = do
threadDelay $ 1 * 10^6
return $ x + 1
We have a general task that makes use of the expensive resource:
doThings :: Int -> IO ()
doThings tid = do
x <- getExpensiveThing tid
putStrLn $ "doThings: thread " ++ show tid ++ " got " ++ show x
At the top level we need to run a few doThings
in parallel:
main0 :: IO ()
main0 = do
hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
let tasks = map doThings [1..5]
withPool 4 $ \pool -> parallel_ pool tasks
The problem with main0
is that the calls to getExpensiveThing
can happen simultaneously, so we need to use some kind of thread synchronisation primitive. I initially thought that I’d have to use a semaphore, a queue, or something fancy, but an MVar can do the trick.
We only need three operations on MVar
:
Use newEmptyMVar
to create a new MVar
which is initially empty:
newEmptyMVar :: IO (MVar a)
Use takeMVar
to get the contents of the MVar
. If the MVar
is empty, takeMVar
will wait until it is full.
takeMVar :: MVar a -> IO a
Finally, use putMVar
to put a value into an MVar
. If the MVar
is full, then putMVar
will wait
until the MVar
is empty. If multiple threads are blocked on an MVar
, they are woken up in FIFO order.
putMVar :: MVar a -> a -> IO ()
So what we can do is have getExpensiveThing
use takeMVar
to block until a worker requires a value. The return value can be passed back via another MVar
, which the worker is itself waiting on. The data type MVar
is polymorphic in its type parameter, so there is no trouble
in having an MVar
of an MVar
, or an MVar
of a tuple containing another MVar
, and so on. This is what we’ll use to represent a blocking action with input value of type a
and output value of type b
:
data InOut a b = InOut (MVar (a, MVar b))
The outer MVar
wraps a tuple, where the first component is the raw input value of type a
, and the
second component is the MVar
in which the result value will be passed back. Here is the new getExpensiveThing
:
getExpensiveThing' :: InOut Int Int -> IO ()
getExpensiveThing' (InOut io) = forever $ do
((input :: Int), (output :: MVar Int)) <- takeMVar io
threadDelay $ 1 * 10^6
putMVar output (input + 1)
The output MVar
is contained inside the
top level MVar
. This way, getExpensiveThing'
has a unique channel back to the calling function. I used ScopedTypeVariables
to be able to write the types of input
and output
inline, but this is just for clarity in this blog post. Also note that getExpensiveThing'
runs forever using forever from Control.Monad.
Here is the updated doThings
that uses the MVar
to communicate with getExpensiveThing'
:
doThings' :: InOut Int Int -> Int -> IO ()
doThings' (InOut io) tid = do
result <- newEmptyMVar -- For our result.
putMVar io (tid, result) -- Send our input (tid) and the result MVar.
x <- takeMVar result -- Get the value from the result MVar.
putStrLn $ "doThings': thread " ++ show tid ++ " got " ++ show x
Finally, main
needs a top-level MVar
which is the first parameter to doThings'
and a forked thread to run getExpensiveThing'
:
main :: IO ()
main = do
hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
topMVar <- newEmptyMVar
_ <- forkIO $ getExpensiveThing' (InOut topMVar)
let tasks = map (doThings' (InOut topMVar)) [1..5]
withPool 4 $ \pool -> parallel_ pool tasks
Now each evaluation of threadDelay
(the sensitive bit of code that represents a call to a resource) happens sequentially`` although the order is nondeterministic.
$ stack build && .stack-work/dist/x86_64-linux/Cabal-1.22.4.0/build/mvar-exe/mvar-exe
doThings': thread 1 got 2
doThings': thread 5 got 6
doThings': thread 2 got 3
doThings': thread 4 got 5
doThings': thread 3 got 4
Just for fun, let’s make some helper functions to make calling a special worker via an MVar
a bit cleaner. In general,
calling a worker requires creating a results MVar
, pushing the input and results MVar
to the InOut
MVar
,
and finally taking the result.
callWorker :: InOut a b -> a -> IO b
callWorker (InOut m) a = do
result <- newEmptyMVar
putMVar m (a, result)
takeMVar result
To save ourselves having to fork a worker, we can write a
combinator that takes a worker and an action and runs the action with
access to the newly created MVar
:
withWorker :: (InOut a b -> IO ()) -> (InOut a b -> IO c) -> IO c
withWorker worker action = do
m <- newEmptyMVar
let io = InOut m
_ <- forkIO $ worker io
action io
Now doThings''
is a bit shorter, at the expense of not knowing (at a glance) what the io
thing is going to do.
doThings'' :: InOut Int Int -> Int -> IO ()
doThings'' io tid = do
x <- callWorker io tid
putStrLn $ "doThings'': thread " ++ show tid ++ " got " ++ show x
Finally, main'
is largely unchanged except for withWorker
at the top level.
main' :: IO ()
main' = withWorker getExpensiveThing' $ \io -> do
hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
let tasks = map (doThings'' io) [1..5]
withPool 4 $ \pool -> parallel_ pool tasks
Running main'
:
*Main> main'
doThings'': thread 2 got 3
doThings'': thread 3 got 4
doThings'': thread 4 got 5
doThings'': thread 1 got 2
doThings'': thread 5 got 6