A short note about using parallel-io to run shell commands in parallel from Haskell. If you want to try out this blog post’s Literate Haskell source then your best bet is to compile in a sandbox which has various package versions fixed using the
cabal.config file (via the
cabal freeze command).
This is how to build the sandbox:
git clone https://github.com/carlohamalainen/playground.git cd playground/haskell/parallel rm -fr .cabal-sandbox cabal.sandbox.config dist # start fresh cabal sandbox init cabal install --haddock-hyperlink-source --dependencies-only cabal install cabal repl
Also, note the line
ghc-options: -threaded -rtsopts -with-rtsopts=-N
parallel.cabal. Without those rtsopts options you would have to execute the binary using
./P +RTS -N.
Now, onto the actual blog post. First, a few imports to get us going.
> module Main where
> import Control.Concurrent > import Control.Monad > import Control.Concurrent.ParallelIO.Local > import Data.Traversable > import qualified Pipes.ByteString as B > import qualified Data.ByteString as BS > import qualified Data.ByteString.Lazy as BSL > import Data.ByteString.Internal (w2c) > import System.Exit > import System.IO > import System.Process.Streaming
In one of my work projects I often need to call legacy command line tools to process various imaging formats (DICOM, MINC, Nifti, etc). I used to use a plain call to
createProcess and then
readRestOfHandle to read the stdout and stderr but I discovered that it can deadlock and a better approach is to use process-streaming. This is the current snippet that I use:
> -- Copied from https://github.com/carlohamalainen/imagetrove-uploader/blob/master/src/Network/ImageTrove/Utils.hs > -- Run a shell command, returning Right with stdout if the command exited successfully > -- and Left with stderr if there was an exit failure. > runShellCommand :: FilePath -> [String] -> IO (Either String String) > runShellCommand cmd args = do > > (exitCode, (stdOut, stdErr)) <- execute (pipeoe (fromFold B.toLazyM) (fromFold B.toLazyM)) ((proc cmd args)) > > return $ case exitCode of > ExitSuccess -> Right $ map w2c $ BS.unpack $ BSL.toStrict stdOut > ExitFailure e -> Left $ "runShellCommand: exit status " ++ show e ++ " with stdErr: " > ++ (map w2c $ BS.unpack $ BSL.toStrict $ stdErr)
Suppose we have a shell command that takes a while, in this case because it’s sleeping. Pretend that it’s IO bound.
> longShellCommand :: Int -> IO (Either String String) > longShellCommand n = do > putStrLn $ "Running sleep command for " ++ show n ++ " second(s)." > runShellCommand "sleep" [show n ++ "s"]
We could run them in order:
> main1 :: IO () > main1 = do > -- Think of these as arguments to our long-running commands. > let sleepTimes = [1, 1, 1, 1] > > forM_ sleepTimes longShellCommand
In Haskell we can think of
IO as a data type that describes an IO action, so we can build it up using ‘pure’ code and then execute them later. To make it a bit more explicit, here is a function for running an IO action:
> runIO :: IO a -> IO a > runIO x = do > result <- x > return result
We can use it like this:
*Main> let action = print 3 -- pure code, nothing happens yet *Main> runIO action -- runs the action 3
And we can rewrite
main1 like this:
> main2 :: IO () > main2 = do > let sleepTimes = [1, 1, 1, 1] > > let actions = map longShellCommand sleepTimes :: [IO (Either String String)] > > forM_ actions runIO
As an aside,
runIO is equivalent to
liftM id (see Control.Monad for info about
Now, imagine that you had a lot of these shell commands to execute and wanted a pool of, say, 4 workers. The parallel-io package provides
withPool which can be used like this:
withPool 4 $ \pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]
Note that the IO actions (the
putStrLn fragments) are provided in a list. A list of IO actions. So we can run our shell commands in parallel like so:
> main3 :: IO () > main3 = do > let sleepTimes = [1, 1, 1, 1] > > let actions = map longShellCommand sleepTimes :: [IO (Either String String)] > > hSetBuffering stdout LineBuffering -- Otherwise output is garbled. > > withPool 4 $ \pool -> parallel_ pool actions
If we did this a lot we might define our own version of
forM_ that uses
> parForM_ :: Int -> [IO a] -> IO () > parForM_ nrWorkers tasks = withPool nrWorkers $ \pool -> parallel_ pool tasks
> main4 :: IO () > main4 = do > let sleepTimes = [1, 1, 1, 1] > > let actions = map longShellCommand sleepTimes :: [IO (Either String String)] > > hSetBuffering stdout LineBuffering -- Otherwise output is garbled. > > parForM_ 4 actions
Here is another example of building up some IO actions in pure form and then executing them later. Imagine that instead of a list of Ints for the sleep times, we have some actual sleep times and others that represent an error case. An easy way to model this is using Either, which by convention has the erroneous values in the
Left and correct values in the
> main5 :: IO () > main5 = do > let sleepTimes = [Right 1, Left "something went wrong", Right 2, Right 3] > > let actions = map (traverse longShellCommand) sleepTimes :: [IO (Either [Char] (Either String String))] > actions' = map (fmap join) actions :: [IO (Either [Char] String)] > > hSetBuffering stdout LineBuffering -- Otherwise output is garbled. > > parForM_ 4 actions'
main5 we define
actions by mapping a function over the sleep times, which are are now of type
Either String Int. We can’t apply
longShellCommand directly because it expects an
Int, so we use
traverse longShellCommand instead (see Data.Traversable for the definition of
Next, the Either-of-Either is a bit clunky but we can mash them together using
join. Here we have to use
fmap because we have list elements of type
IO (Either [Char] String), not
Either [Char] String as
join might expect.
One topic that I haven’t touched on is dealing with asynchronous exceptions. For this, have a read of Catching all exceptions from Snoyman and also enclosed-exceptions. Also, Chapter 13 of Parallel and Concurrent Programming in Haskell shows how to use the handy async package.
> -- Run all of the mains. > main :: IO () > main = do > > print "main1" > main1 > > print "main2" > main2 > > print "main3" > main3 > > print "main4" > main4 > > print "main5" > main5