Note to self on catching exceptions in multithreaded Haskell code. Literate Haskell source and build scripts and cabal stuff is at https://github.com/carlohamalainen/playground/tree/master/haskell/exceptions-in-parallel.

For my use cases there are two scenarios when running a list of worker threads:

  1. If any thread throws an exception, give up on everything.

  2. If any thread throws an exception, log it, but let the other workers run to completion.

First, imports that we’ll use:

{-# LANGUAGE DeriveDataTypeable, ScopedTypeVariables #-}

module Main where

import Data.Conduit
import Data.Conduit.List
import Data.Traversable (traverse)
import Control.Applicative
import Control.Monad.Catch
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.ParallelIO.Local
import Control.Monad hiding (mapM, mapM_)
import Control.Monad.Catch
import Data.Typeable
import Prelude hiding (map, mapM, mapM_)
import System.IO

We will use code from parallel-io and async for running worker threads. For a pipeline we’ll also use conduit.

Here’s our exception type, which we throw using throwM from Control.Monad.Catch:

data MyException = MyException String deriving (Show, Typeable)

instance Exception MyException

Our two tasks. The first task immediately throws an exception; the second waits for 5 seconds and completes happily.

task1 :: IO String
task1 = throwM $ MyException "task1 blew up"

task2 :: IO String
task2 = do
  threadDelay $ 5 * 10^6
  return $ "task2 finished"

Example: parallel_ Link to heading

main1 :: IO ()
main1 = do

  x <- withPool 2 $ \pool -> parallel_ pool [task1, task2]
  print (x :: ())

Output:

*Main> main1
*** Exception: MyException "task1 blew up"

Example: parallelE_ Link to heading

main2 :: IO ()
main2 = do

  x <- withPool 2 $ \pool -> parallelE_ pool [task1, task2]
  print x

Output:

*Main> main2
[Just (MyException "task1 blew up"),Nothing]

Example: parallel Link to heading

main3 :: IO ()
main3 = do
  x <- withPool 2 $ \pool -> parallel pool [task1, task2]
  print x

Output:

*Main> main3
*** Exception: MyException "task1 blew up"

Example: parallelE Link to heading

main4 :: IO ()
main4 = do
  x <- withPool 2 $ \pool -> parallelE pool [task1, task2]
  print x

Output:

*Main> main4
[Left (MyException "task1 blew up"),Right "task2 finished"]

Example: async/wait Link to heading

main5 :: IO ()
main5 = do
  a1 <- async task1
  a2 <- async task2
  result1 <- wait a1
  result2 <- wait a2

  print [result1, result2]

Output:

*Main> main5
*** Exception: MyException "task1 blew up"

Example: async/waitCatch Link to heading

main6 :: IO ()
main6 = do
  a1 <- async task1
  a2 <- async task2
  result1 <- waitCatch a1
  result2 <- waitCatch a2

  print [result1, result2]

Output:

*Main> main6
[Left (MyException "task1 blew up"),Right "task2 finished"]

Example: concurrently Link to heading

main7 :: IO ()
main7 = do
  result <- concurrently task1 task2

  print result

Output:

*Main> main7
*** Exception: MyException "task1 blew up"

Example: throwM in a conduit sink Link to heading

main8 :: IO ()
main8 = do
  sourceList [1..5] $$ (throwM $ MyException "main8 in conduit exploded")
  print "this is never printed"

Output:

*** Exception: MyException "main8 in conduit exploded"

Example: throwM in a conduit sink (on one value) Link to heading

main9 :: IO ()
main9 = do

  let foo x = if x == 3 then throwM $ MyException "got a 3 in main9"
                        else print x

  sourceList [1..5] $$ (mapM_ foo)
  print "this is never printed"

The conduit processes values 1 and 2, throws an exception on 3, and never sees 4 and 5.

*Main> main9
1
2
*** Exception: MyException "got a 3 in main9"

Example: throwM/catchC Link to heading

main10 :: IO ()
main10 = do

  let foo x = if x == 3 then throwM $ MyException "got a 3 in main10"
                        else print x

  let sink = catchC (mapM_ foo)
                    (\(e :: SomeException) -> mapM_ $ \x -> putStrLn $ "When processing " ++ show x ++ " caught exception: " ++ show e)

  sourceList [1..5] $$ sink
  print "main10 finished"

The output is not what I expected. Values 1 and 2 are processed as expected, then the 3 throws an exception, but the effect of catchC is that the rest of the values (4 and 5) are processed using the second argument to catchC. In this situation, a conduit can’t be used to process a stream with independently failing components. You have to catch all exceptions before they bubble up to the conduit code.

1
2
When processing 4 caught exception: MyException "got a 3 in main10"
When processing 5 caught exception: MyException "got a 3 in main10"
"main10 finished"

Example: catchAll in conduit Link to heading

A combinator that runs an IO action and catches any exception:

catchBlah :: Show a => (a -> IO ()) -> a -> IO ()
catchBlah action = \x -> catchAll (action x)
                                  (\(e :: SomeException) -> putStrLn $ "On value " ++ show x ++ " caught exception: " ++ show e)

Using catchBlah in the sink:

main11 :: IO ()
main11 = do

  let foo x = if x == 3 then throwM $ MyException "got a 3 in main11"
                        else print x

  sourceList [1..5] $$ (mapM_ $ catchBlah foo)

  print "main11 finished"

Now the conduit processes every value, because the exception is caught and dealt with at a lower level.

*Main> main11
1
2
On value 3 caught exception: MyException "got a 3 in main11"
4
5
"main11 finished"

Example: catchBlah’ in conduit Link to heading

Now, suppose we have a few stages in the conduit and the first stage blows up. Use catchAll to catch the exception and return a IO (Maybe b) instead of IO b:

catchBlah' :: Show a => (a -> IO b) -> a -> IO (Maybe b)
catchBlah' action = \x -> do
  catchAll (action x >>= (return . Just))
           (\(e :: SomeException) -> do putStrLn $ "On value " ++ show x ++ " caught exception: " ++ show e
                                        return Nothing)

main12 :: IO ()
main12 = do

  let src = [1..5] :: [Int]

  let stage1 x = do when (x == 3) $ throwM $ MyException "Got a 3 in stage1"
                    putStrLn $ "First print: " ++ show x
                    return x

  sourceList src $$ (mapM $ catchBlah' stage1) =$= (mapM_ print)

  print "main12 finished"

Output:

First print: 1
Just 1
First print: 2
Just 2
On value 3 caught exception: MyException "Got a 3 in stage1"
Nothing
First print: 4
Just 4
First print: 5
Just 5
"main12 finished"

Example: catchBlah’ in conduit (tweaked) Link to heading

Same as the previous example but with nicer printing in the sink:

main13 :: IO ()
main13 = do

  let src = [1..5] :: [Int]

  let stage1 x = do when (x == 3) $ throwM $ MyException "Got a 3 in stage1"
                    putStrLn $ "First print: " ++ show x
                    return x
      stage2 x = case x of
                      Just x' -> do putStrLn $ "Second print: " ++ show (x' + 1)
                                    putStrLn ""
                      Nothing -> do putStrLn $ "Second print got Nothing..."
                                    putStrLn ""

  sourceList src $$ (mapM $ catchBlah' stage1) =$= (mapM_ stage2)

  print "main13 finished"

Output:

*Main> main13
First print: 1
Second print: 2

First print: 2
Second print: 3

On value 3 caught exception: MyException "Got a 3 in stage1"
Second print got Nothing...

First print: 4
Second print: 5

First print: 5
Second print: 6

"main13 finished"