MVar note

A short note about using MVars in Haskell. Source is here: https://github.com/carlohamalainen/playground/tree/master/haskell/mvar.

Unlike earlier blog posts, this one should be built using Stack. Something like:

git clone https://github.com/carlohamalainen/playground.git
cd playground/haskell/mvar
stack build

Then use stack ghci instead of cabal repl. The main executable is .stack-work/dist/x86_64-linux/Cabal-1.22.4.0/build/mvar-exe/mvar-exe.

> {-# LANGUAGE ScopedTypeVariables #-}
>
> module Main where
>
> import Control.Concurrent
> import Control.Monad
> import Control.Concurrent.ParallelIO.Local
> import System.IO

Here is the situation: we have a function that makes a call to some restricted resource, say a network API, and we would like calls to this API from our application to be serialized across multiple threads. For the purposes of this blog post, here is a dummy function that sleeps a bit and returns x + 1. Pretend that it’s calling a magical API on the network somewhere.

> getExpensiveThing :: Int -> IO Int
> getExpensiveThing x = do
>   threadDelay $ 1 * 10^6
>   return $ x + 1

We have a general task that makes use of the expensive resource:

> doThings :: Int -> IO ()
> doThings tid = do
>   x    putStrLn $ "doThings: thread " ++ show tid ++ " got " ++ show x

At the top level we need to run a few doThings in parallel:

> main0 :: IO ()
> main0 = do
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
>
>   let tasks = map doThings [1..5]
>
>   withPool 4 $ pool -> parallel_ pool tasks

The problem with main0 is that the calls to getExpensiveThing can happen simultaneously, so we need to use some kind of thread synchronisation primitive. I initially thought that I’d have to use a semaphore, a queue, or something fancy, but an MVar can do the trick.

We only need three operations on MVar:

Use newEmptyMVar to create a new MVar which is initially empty:

> newEmptyMVar :: IO (MVar a)

Use takeMVar to get the contents of the MVar. If the MVar is empty, takeMVar will wait until it is full.

> takeMVar :: MVar a -> IO a

Finally, use putMVar to put a value into an MVar. If the MVar is full, then putMVar will wait until the MVar is empty. If multiple threads are blocked on an MVar, they are woken up in FIFO order.

> putMVar :: MVar a -> a -> IO ()

So what we can do is have getExpensiveThing use takeMVar to block until a worker requires a value. The return value can be passed back via another MVar, which the worker is itself waiting on. The data type MVar is polymorphic in its type parameter, so there is no trouble in having an MVar of an MVar, or an MVar of a tuple containing another MVar, and so on. This is what we’ll use to represent a blocking action with input value of type a and output value of type b:

> data InOut a b = InOut (MVar (a, MVar b))

The outer MVar wraps a tuple, where the first component is the raw input value of type a, and the second component is the MVar in which the result value will be passed back. Here is the new getExpensiveThing:

> getExpensiveThing' :: InOut Int Int -> IO ()
> getExpensiveThing' (InOut io) = forever $ do
>   ((input :: Int), (output :: MVar Int))    threadDelay $ 1 * 10^6
>   putMVar output (input + 1)

The output MVar is contained inside the top level MVar. This way, getExpensiveThing’ has a unique channel back to the calling function. I used ScopedTypeVariables to be able to write the types of input and output inline, but this is just for clarity in this blog post. Also note that getExpensiveThing’ runs forever using forever from Control.Monad.

Here is the updated doThings that uses the MVar to communicate with getExpensiveThing’:

> doThings' :: InOut Int Int -> Int -> IO ()
> doThings' (InOut io) tid = do
>   result    putMVar io (tid, result)    -- Send our input (tid) and the result MVar.
>   x 
>   putStrLn $ "doThings': thread " ++ show tid ++ " got " ++ show x

Finally, main needs a top-level MVar which is the first parameter to doThings’ and a forked thread to run getExpensiveThing’:

> main :: IO ()
> main = do
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
>
>   topMVar 
>   _ 
>   let tasks = map (doThings' (InOut topMVar)) [1..5]
>
>   withPool 4 $ pool -> parallel_ pool tasks

Now each evaluation of threadDelay (the sensitive bit of code that represents a call to a resource) happens sequentially although the order is nondeterministic.

$ stack build && .stack-work/dist/x86_64-linux/Cabal-1.22.4.0/build/mvar-exe/mvar-exe
doThings': thread 1 got 2
doThings': thread 5 got 6
doThings': thread 2 got 3
doThings': thread 4 got 5
doThings': thread 3 got 4

Just for fun, let’s make some helper functions to make calling a special worker via an MVar a bit cleaner. In general, calling a worker requires creating a results MVar, pushing the input and results MVar to the InOut MVar, and finally taking the result.

> callWorker :: InOut a b -> a -> IO b
> callWorker (InOut m) a = do
>     result      putMVar m (a, result)
>     takeMVar result

To save ourselves having to fork a worker, we can write a combinator that takes a worker and an action and runs the action with access to the newly created MVar:

> withWorker :: (InOut a b -> IO ()) -> (InOut a b -> IO c) -> IO c
> withWorker worker action = do
>   m    let io = InOut m
>   _    action io

Now doThings” is a bit shorter, at the expense of not knowing (at a glance) what the io thing is going to do.

> doThings'' :: InOut Int Int -> Int -> IO ()
> doThings'' io tid = do
>   x 
>   putStrLn $ "doThings'': thread " ++ show tid ++ " got " ++ show x

Finally, main’ is largely unchanged except for withWorker at the top level.

> main' :: IO ()
> main' = withWorker getExpensiveThing' $ io -> do
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
>
>   let tasks = map (doThings'' io) [1..5]
>
>   withPool 4 $ pool -> parallel_ pool tasks

Running main’:

*Main> main'
doThings'': thread 2 got 3
doThings'': thread 3 got 4
doThings'': thread 4 got 5
doThings'': thread 1 got 2
doThings'': thread 5 got 6