Carlo Hamalainen


Note to self: parallel-io

2015-05-18

A short note about using parallel-io to run shell commands in parallel from Haskell. If you want to try out this blog post’s Literate Haskell source then your best bet is to compile in a sandbox which has various package versions fixed using the cabal.config file (via the cabal freeze command).

This is how to build the sandbox:

git clone https://github.com/carlohamalainen/playground.git
cd playground/haskell/parallel
rm -fr .cabal-sandbox cabal.sandbox.config dist # start fresh
cabal sandbox init
cabal install --haddock-hyperlink-source --dependencies-only
cabal install
cabal repl

Also, note the line

  ghc-options:         -threaded -rtsopts -with-rtsopts=-N

in parallel.cabal. Without those rtsopts options you would have to execute the binary using ./P +RTS -N.

Now, onto the actual blog post. First, a few imports to get us going.

> module Main where
> import           Control.Concurrent
> import           Control.Monad
> import           Control.Concurrent.ParallelIO.Local
> import           Data.Traversable
> import qualified Pipes.ByteString                           as B
> import qualified Data.ByteString                            as BS
> import qualified Data.ByteString.Lazy                       as BSL
> import           Data.ByteString.Internal (w2c)
> import           System.Exit
> import           System.IO
> import           System.Process.Streaming

In one of my work projects I often need to call legacy command line tools to process various imaging formats (DICOM, MINC, Nifti, etc). I used to use a plain call to createProcess and then readRestOfHandle to read the stdout and stderr but I discovered that it can deadlock and a better approach is to use process-streaming. This is the current snippet that I use:

> -- Copied from https://github.com/carlohamalainen/imagetrove-uploader/blob/master/src/Network/ImageTrove/Utils.hs
> -- Run a shell command, returning Right with stdout if the command exited successfully
> -- and Left with stderr if there was an exit failure.
> runShellCommand :: FilePath -> [String] -> IO (Either String String)
> runShellCommand cmd args = do
> 
>     (exitCode, (stdOut, stdErr)) <- execute (pipeoe (fromFold B.toLazyM) (fromFold B.toLazyM)) ((proc cmd args))
> 
>     return $ case exitCode of
>         ExitSuccess   -> Right $ map w2c $ BS.unpack $ BSL.toStrict stdOut
>         ExitFailure e -> Left $ "runShellCommand: exit status " ++ show e ++ " with stdErr: "
>                                                                 ++ (map w2c $ BS.unpack $ BSL.toStrict $ stdErr)

Suppose we have a shell command that takes a while, in this case because it’s sleeping. Pretend that it’s IO bound.

> longShellCommand :: Int -> IO (Either String String)
> longShellCommand n = do
>   putStrLn $ "Running sleep command for " ++ show n ++ " second(s)."
>   runShellCommand "sleep" [show n ++ "s"]

We could run them in order:

> main1 :: IO ()
> main1 = do
>   -- Think of these as arguments to our long-running commands.
>   let sleepTimes = [1, 1, 1, 1]
> 
>   forM_ sleepTimes longShellCommand

In Haskell we can think of IO as a data type that describes an IO action, so we can build it up using ‘pure’ code and then execute them later. To make it a bit more explicit, here is a function for running an IO action:

> runIO :: IO a -> IO a
> runIO x = do
>   result <- x
>   return result

We can use it like this:

*Main> let action = print 3 -- pure code, nothing happens yet
*Main> runIO action         -- runs the action
3

And we can rewrite main1 like this:

> main2 :: IO ()
> main2 = do
>   let sleepTimes = [1, 1, 1, 1]
> 
>   let actions = map longShellCommand sleepTimes :: [IO (Either String String)]
> 
>   forM_ actions runIO

As an aside, runIO is equivalent to liftM id (see Control.Monad for info about liftM).

Now, imagine that you had a lot of these shell commands to execute and wanted a pool of, say, 4 workers. The parallel-io package provides withPool which can be used like this:

withPool 4 $ \pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]

Note that the IO actions (the putStrLn fragments) are provided in a list. A list of IO actions. So we can run our shell commands in parallel like so:

> main3 :: IO ()
> main3 = do
>   let sleepTimes = [1, 1, 1, 1]
> 
>   let actions = map longShellCommand sleepTimes :: [IO (Either String String)]
> 
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
> 
>   withPool 4 $ \pool -> parallel_ pool actions

If we did this a lot we might define our own version of forM_ that uses withPool:

> parForM_ :: Int -> [IO a] -> IO ()
> parForM_ nrWorkers tasks = withPool nrWorkers $ \pool -> parallel_ pool tasks
> main4 :: IO ()
> main4 = do
>   let sleepTimes = [1, 1, 1, 1]
> 
>   let actions = map longShellCommand sleepTimes :: [IO (Either String String)]
> 
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
> 
>   parForM_ 4 actions

Here is another example of building up some IO actions in pure form and then executing them later. Imagine that instead of a list of Ints for the sleep times, we have some actual sleep times and others that represent an error case. An easy way to model this is using Either, which by convention has the erroneous values in the Left and correct values in the Right.

> main5 :: IO ()
> main5 = do
>   let sleepTimes = [Right 1, Left "something went wrong", Right 2, Right 3]
> 
>   let actions  = map (traverse longShellCommand) sleepTimes :: [IO (Either [Char] (Either String String))]
>       actions' = map (fmap join) actions                    :: [IO (Either [Char] String)]
> 
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
> 
>   parForM_ 4 actions'

In main5 we define actions by mapping a function over the sleep times, which are are now of type Either String Int. We can’t apply longShellCommand directly because it expects an Int, so we use traverse longShellCommand instead (see Data.Traversable for the definition of traverse).

Next, the Either-of-Either is a bit clunky but we can mash them together using join. Here we have to use fmap because we have list elements of type IO (Either [Char] String), not Either [Char] String as join might expect.

One topic that I haven’t touched on is dealing with asynchronous exceptions. For this, have a read of Catching all exceptions from Snoyman and also enclosed-exceptions. Also, Chapter 13 of Parallel and Concurrent Programming in Haskell shows how to use the handy async package.

> -- Run all of the mains.
> main :: IO ()
> main = do
> 
>   print "main1"
>   main1
> 
>   print "main2"
>   main2
> 
>   print "main3"
>   main3
> 
>   print "main4"
>   main4
> 
>   print "main5"
>   main5