Note to self: exceptions in multithreaded Haskell

Note to self on catching exceptions in multithreaded Haskell code. Literate Haskell source and build scripts and cabal stuff is at https://github.com/carlohamalainen/playground/tree/master/haskell/exceptions-in-parallel.

For my use cases there are two scenarios when running a list of worker threads:

  1. If any thread throws an exception, give up on everything.
  2. If any thread throws an exception, log it, but let the other workers run to completion.

First, imports that we’ll use:

> {-# LANGUAGE DeriveDataTypeable, ScopedTypeVariables #-}
>
> module Main where
>
> import Data.Conduit
> import Data.Conduit.List
> import Data.Traversable (traverse)
> import Control.Applicative
> import Control.Monad.Catch
> import Control.Concurrent
> import Control.Concurrent.Async
> import Control.Concurrent.ParallelIO.Local
> import Control.Monad hiding (mapM, mapM_)
> import Control.Monad.Catch
> import Data.Typeable
> import Prelude hiding (map, mapM, mapM_)
> import System.IO

We will use code from parallel-io and async for running worker threads. For a pipeline we’ll also use conduit.

Here’s our exception type, which we throw using throwM from Control.Monad.Catch:

> data MyException = MyException String deriving (Show, Typeable)
>
> instance Exception MyException

Our two tasks. The first task immediately throws an exception; the second waits for 5 seconds and completes happily.

> task1 :: IO String
> task1 = throwM $ MyException "task1 blew up"
>
> task2 :: IO String
> task2 = do
>   threadDelay $ 5 * 10^6
>   return $ "task2 finished"

Example: parallel_

> main1 :: IO ()
> main1 = do
>
>   x  parallel_ pool [task1, task2]
>   print (x :: ())

Output:

*Main> main1
*** Exception: MyException "task1 blew up"

Example: parallelE_

> main2 :: IO ()
> main2 = do
>
>   x  parallelE_ pool [task1, task2]
>   print x

Output:

*Main> main2
[Just (MyException "task1 blew up"),Nothing]

Example: parallel

> main3 :: IO ()
> main3 = do
>   x  parallel pool [task1, task2]
>   print x

Output:

*Main> main3
*** Exception: MyException "task1 blew up"

Example: parallelE

> main4 :: IO ()
> main4 = do
>   x  parallelE pool [task1, task2]
>   print x

Output:

*Main> main4
[Left (MyException "task1 blew up"),Right "task2 finished"]

Example: async/wait

> main5 :: IO ()
> main5 = do
>   a1    a2    result1    result2 
>   print [result1, result2]

Output:

*Main> main5
*** Exception: MyException "task1 blew up"

Example: async/waitCatch

> main6 :: IO ()
> main6 = do
>   a1    a2    result1    result2 
>   print [result1, result2]

Output:

*Main> main6
[Left (MyException "task1 blew up"),Right "task2 finished"]

Example: concurrently

> main7 :: IO ()
> main7 = do
>   result 
>   print result

Output:

*Main> main7
*** Exception: MyException "task1 blew up"

Example: throwM in a conduit sink

> main8 :: IO ()
> main8 = do
>   sourceList [1..5] $$ (throwM $ MyException "main8 in conduit exploded")
>   print "this is never printed"

Output:

*** Exception: MyException "main8 in conduit exploded"

Example: throwM in a conduit sink (on one value)

> main9 :: IO ()
> main9 = do
>
>   let foo x = if x == 3 then throwM $ MyException "got a 3 in main9"
>                         else print x
>
>   sourceList [1..5] $$ (mapM_ foo)
>   print "this is never printed"

The conduit processes values 1 and 2, throws an exception on 3, and never sees 4 and 5.

*Main> main9
1
2
*** Exception: MyException "got a 3 in main9"

Example: throwM/catchC

> main10 :: IO ()
> main10 = do
>
>   let foo x = if x == 3 then throwM $ MyException "got a 3 in main10"
>                         else print x
>
>   let sink = catchC (mapM_ foo)
>                     ((e :: SomeException) -> mapM_ $ x -> putStrLn $ "When processing " ++ show x ++ " caught exception: " ++ show e)
>
>   sourceList [1..5] $$ sink
>   print "main10 finished"

The output is not what I expected. Values 1 and 2 are processed as expected, then the 3 throws an exception, but the effect of catchC is that the rest of the values (4 and 5) are processed using the second argument to catchC. In this situation, a conduit can’t be used to process a stream with independently failing components. You have to catch all exceptions before they bubble up to the conduit code.

1
2
When processing 4 caught exception: MyException "got a 3 in main10"
When processing 5 caught exception: MyException "got a 3 in main10"
"main10 finished"

Example: catchAll in conduit

A combinator that runs an IO action and catches any exception:

> catchBlah :: Show a => (a -> IO ()) -> a -> IO ()
> catchBlah action = x -> catchAll (action x)
>                                   ((e :: SomeException) -> putStrLn $ "On value " ++ show x ++ " caught exception: " ++ show e)

Using catchBlah in the sink:

> main11 :: IO ()
> main11 = do
>
>   let foo x = if x == 3 then throwM $ MyException "got a 3 in main11"
>                         else print x
>
>   sourceList [1..5] $$ (mapM_ $ catchBlah foo)
>
>   print "main11 finished"

Now the conduit processes every value, because the exception is caught and dealt with at a lower level.

*Main> main11
1
2
On value 3 caught exception: MyException "got a 3 in main11"
4
5
"main11 finished"

Example: catchBlah’ in conduit

Now, suppose we have a few stages in the conduit and the first stage blows up. Use catchAll to catch the exception and return a IO (Maybe b) instead of IO b:

> catchBlah' :: Show a => (a -> IO b) -> a -> IO (Maybe b)
> catchBlah' action = x -> do
>   catchAll (action x >>= (return . Just))
>            ((e :: SomeException) -> do putStrLn $ "On value " ++ show x ++ " caught exception: " ++ show e
>                                         return Nothing)
> main12 :: IO ()
> main12 = do
>
>   let src = [1..5] :: [Int]
>
>   let stage1 x = do when (x == 3) $ throwM $ MyException "Got a 3 in stage1"
>                     putStrLn $ "First print: " ++ show x
>                     return x
>
>   sourceList src $$ (mapM $ catchBlah' stage1) =$= (mapM_ print)
>
>   print "main12 finished"

Output:

First print: 1
Just 1
First print: 2
Just 2
On value 3 caught exception: MyException "Got a 3 in stage1"
Nothing
First print: 4
Just 4
First print: 5
Just 5
"main12 finished"

Example: catchBlah’ in conduit (tweaked)

Same as the previous example but with nicer printing in the sink:

> main13 :: IO ()
> main13 = do
>
>   let src = [1..5] :: [Int]
>
>   let stage1 x = do when (x == 3) $ throwM $ MyException "Got a 3 in stage1"
>                     putStrLn $ "First print: " ++ show x
>                     return x
>       stage2 x = case x of
>                       Just x' -> do putStrLn $ "Second print: " ++ show (x' + 1)
>                                     putStrLn ""
>                       Nothing -> do putStrLn $ "Second print got Nothing..."
>                                     putStrLn ""
>
>   sourceList src $$ (mapM $ catchBlah' stage1) =$= (mapM_ stage2)
>
>   print "main13 finished"

Output:

*Main> main13
First print: 1
Second print: 2

First print: 2
Second print: 3

On value 3 caught exception: MyException "Got a 3 in stage1"
Second print got Nothing...

First print: 4
Second print: 5

First print: 5
Second print: 6

"main13 finished"

Note to self: parallel-io

A short note about using parallel-io to run shell commands in parallel from Haskell. If you want to try out this blog post’s Literate Haskell source then your best bet is to compile in a sandbox which has various package versions fixed using the cabal.config file (via the cabal freeze command).

This is how to build the sandbox:

git clone https://github.com/carlohamalainen/playground.git
cd playground/haskell/parallel
rm -fr .cabal-sandbox cabal.sandbox.config dist # start fresh
cabal sandbox init
cabal install --haddock-hyperlink-source --dependencies-only
cabal install
cabal repl

Also, note the line

  ghc-options:         -threaded -rtsopts -with-rtsopts=-N

in parallel.cabal. Without those rtsopts options you would have to execute the binary using ./P +RTS -N.

Now, onto the actual blog post. First, a few imports to get us going.

> module Main where
> import           Control.Concurrent
> import           Control.Monad
> import           Control.Concurrent.ParallelIO.Local
> import           Data.Traversable
> import qualified Pipes.ByteString                           as B
> import qualified Data.ByteString                            as BS
> import qualified Data.ByteString.Lazy                       as BSL
> import           Data.ByteString.Internal (w2c)
> import           System.Exit
> import           System.IO
> import           System.Process.Streaming

In one of my work projects I often need to call legacy command line tools to process various imaging formats (DICOM, MINC, Nifti, etc). I used to use a plain call to createProcess and then readRestOfHandle to read the stdout and stderr but I discovered that it can deadlock and a better approach is to use process-streaming. This is the current snippet that I use:

> -- Copied from https://github.com/carlohamalainen/imagetrove-uploader/blob/master/src/Network/ImageTrove/Utils.hs
> -- Run a shell command, returning Right with stdout if the command exited successfully
> -- and Left with stderr if there was an exit failure.
> runShellCommand :: FilePath -> [String] -> IO (Either String String)
> runShellCommand cmd args = do
>
>     (exitCode, (stdOut, stdErr)) 
>     return $ case exitCode of
>         ExitSuccess   -> Right $ map w2c $ BS.unpack $ BSL.toStrict stdOut
>         ExitFailure e -> Left $ "runShellCommand: exit status " ++ show e ++ " with stdErr: "
>                                                                 ++ (map w2c $ BS.unpack $ BSL.toStrict $ stdErr)

Suppose we have a shell command that takes a while, in this case because it’s sleeping. Pretend that it’s IO bound.

> longShellCommand :: Int -> IO (Either String String)
> longShellCommand n = do
>   putStrLn $ "Running sleep command for " ++ show n ++ " second(s)."
>   runShellCommand "sleep" [show n ++ "s"]

We could run them in order:

> main1 :: IO ()
> main1 = do
>   -- Think of these as arguments to our long-running commands.
>   let sleepTimes = [1, 1, 1, 1]
>
>   forM_ sleepTimes longShellCommand

In Haskell we can think of IO as a data type that describes an IO action, so we can build it up using ‘pure’ code and then execute them later. To make it a bit more explicit, here is a function for running an IO action:

> runIO :: IO a -> IO a
> runIO x = do
>   result    return result

We can use it like this:

*Main> let action = print 3 -- pure code, nothing happens yet
*Main> runIO action         -- runs the action
3

And we can rewrite main1 like this:

> main2 :: IO ()
> main2 = do
>   let sleepTimes = [1, 1, 1, 1]
>
>   let actions = map longShellCommand sleepTimes :: [IO (Either String String)]
>
>   forM_ actions runIO

As an aside, runIO is equivalent to liftM id (see Control.Monad for info about liftM).

Now, imagine that you had a lot of these shell commands to execute and wanted a pool of, say, 4 workers. The parallel-io package provides withPool which can be used like this:

withPool 4 $ pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]

Note that the IO actions (the putStrLn fragments) are provided in a list. A list of IO actions. So we can run our shell commands in parallel like so:

> main3 :: IO ()
> main3 = do
>   let sleepTimes = [1, 1, 1, 1]
>
>   let actions = map longShellCommand sleepTimes :: [IO (Either String String)]
>
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
>
>   withPool 4 $ pool -> parallel_ pool actions

If we did this a lot we might define our own version of forM_ that uses withPool:

> parForM_ :: Int -> [IO a] -> IO ()
> parForM_ nrWorkers tasks = withPool nrWorkers $ pool -> parallel_ pool tasks
> main4 :: IO ()
> main4 = do
>   let sleepTimes = [1, 1, 1, 1]
>
>   let actions = map longShellCommand sleepTimes :: [IO (Either String String)]
>
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
>
>   parForM_ 4 actions

Here is another example of building up some IO actions in pure form and then executing them later. Imagine that instead of a list of Ints for the sleep times, we have some actual sleep times and others that represent an error case. An easy way to model this is using Either, which by convention has the erroneous values in the Left and correct values in the Right.

> main5 :: IO ()
> main5 = do
>   let sleepTimes = [Right 1, Left "something went wrong", Right 2, Right 3]
>
>   let actions  = map (traverse longShellCommand) sleepTimes :: [IO (Either [Char] (Either String String))]
>       actions' = map (fmap join) actions                    :: [IO (Either [Char] String)]
>
>   hSetBuffering stdout LineBuffering -- Otherwise output is garbled.
>
>   parForM_ 4 actions'

In main5 we define actions by mapping a function over the sleep times, which are are now of type Either String Int. We can’t apply longShellCommand directly because it expects an Int, so we use traverse longShellCommand instead (see Data.Traversable for the definition of traverse).

Next, the Either-of-Either is a bit clunky but we can mash them together using join. Here we have to use fmap because we have list elements of type IO (Either [Char] String), not Either [Char] String as join might expect.

One topic that I haven’t touched on is dealing with asynchronous exceptions. For this, have a read of Catching all exceptions from Snoyman and also enclosed-exceptions. Also, Chapter 13 of Parallel and Concurrent Programming in Haskell shows how to use the handy async package.

> -- Run all of the mains.
> main :: IO ()
> main = do
>
>   print "main1"
>   main1
>
>   print "main2"
>   main2
>
>   print "main3"
>   main3
>
>   print "main4"
>   main4
>
>   print "main5"
>   main5

Archived Comments

Date: 2015-05-26 01:49:57.735584 UTC

Author: Franklin Chen

A side note: `runShellCommand` does not seem to play well with Unicode, because it does not do decoding of bytes, e.g.,

> runShellCommand “ls” [“écoutez”]
Left “runShellCommand: exit status 1 with stdErr: ls: 195169coutez: No such file or directoryn”

Note that I have LANG=en_US.UTF-8 set on my Mac.