Note to self: exceptions in multithreaded Haskell

Note to self on catching exceptions in multithreaded Haskell code. Literate Haskell source and build scripts and cabal stuff is at https://github.com/carlohamalainen/playground/tree/master/haskell/exceptions-in-parallel.

For my use cases there are two scenarios when running a list of worker threads:

  1. If any thread throws an exception, give up on everything.
  2. If any thread throws an exception, log it, but let the other workers run to completion.

First, imports that we’ll use:

> {-# LANGUAGE DeriveDataTypeable, ScopedTypeVariables #-}
>
> module Main where
>
> import Data.Conduit
> import Data.Conduit.List
> import Data.Traversable (traverse)
> import Control.Applicative
> import Control.Monad.Catch
> import Control.Concurrent
> import Control.Concurrent.Async
> import Control.Concurrent.ParallelIO.Local
> import Control.Monad hiding (mapM, mapM_)
> import Control.Monad.Catch
> import Data.Typeable
> import Prelude hiding (map, mapM, mapM_)
> import System.IO

We will use code from parallel-io and async for running worker threads. For a pipeline we’ll also use conduit.

Here’s our exception type, which we throw using throwM from Control.Monad.Catch:

> data MyException = MyException String deriving (Show, Typeable)
>
> instance Exception MyException

Our two tasks. The first task immediately throws an exception; the second waits for 5 seconds and completes happily.

> task1 :: IO String
> task1 = throwM $ MyException "task1 blew up"
>
> task2 :: IO String
> task2 = do
>   threadDelay $ 5 * 10^6
>   return $ "task2 finished"

Example: parallel_

> main1 :: IO ()
> main1 = do
>
>   x  parallel_ pool [task1, task2]
>   print (x :: ())

Output:

*Main> main1
*** Exception: MyException "task1 blew up"

Example: parallelE_

> main2 :: IO ()
> main2 = do
>
>   x  parallelE_ pool [task1, task2]
>   print x

Output:

*Main> main2
[Just (MyException "task1 blew up"),Nothing]

Example: parallel

> main3 :: IO ()
> main3 = do
>   x  parallel pool [task1, task2]
>   print x

Output:

*Main> main3
*** Exception: MyException "task1 blew up"

Example: parallelE

> main4 :: IO ()
> main4 = do
>   x  parallelE pool [task1, task2]
>   print x

Output:

*Main> main4
[Left (MyException "task1 blew up"),Right "task2 finished"]

Example: async/wait

> main5 :: IO ()
> main5 = do
>   a1    a2    result1    result2 
>   print [result1, result2]

Output:

*Main> main5
*** Exception: MyException "task1 blew up"

Example: async/waitCatch

> main6 :: IO ()
> main6 = do
>   a1    a2    result1    result2 
>   print [result1, result2]

Output:

*Main> main6
[Left (MyException "task1 blew up"),Right "task2 finished"]

Example: concurrently

> main7 :: IO ()
> main7 = do
>   result 
>   print result

Output:

*Main> main7
*** Exception: MyException "task1 blew up"

Example: throwM in a conduit sink

> main8 :: IO ()
> main8 = do
>   sourceList [1..5] $$ (throwM $ MyException "main8 in conduit exploded")
>   print "this is never printed"

Output:

*** Exception: MyException "main8 in conduit exploded"

Example: throwM in a conduit sink (on one value)

> main9 :: IO ()
> main9 = do
>
>   let foo x = if x == 3 then throwM $ MyException "got a 3 in main9"
>                         else print x
>
>   sourceList [1..5] $$ (mapM_ foo)
>   print "this is never printed"

The conduit processes values 1 and 2, throws an exception on 3, and never sees 4 and 5.

*Main> main9
1
2
*** Exception: MyException "got a 3 in main9"

Example: throwM/catchC

> main10 :: IO ()
> main10 = do
>
>   let foo x = if x == 3 then throwM $ MyException "got a 3 in main10"
>                         else print x
>
>   let sink = catchC (mapM_ foo)
>                     ((e :: SomeException) -> mapM_ $ x -> putStrLn $ "When processing " ++ show x ++ " caught exception: " ++ show e)
>
>   sourceList [1..5] $$ sink
>   print "main10 finished"

The output is not what I expected. Values 1 and 2 are processed as expected, then the 3 throws an exception, but the effect of catchC is that the rest of the values (4 and 5) are processed using the second argument to catchC. In this situation, a conduit can’t be used to process a stream with independently failing components. You have to catch all exceptions before they bubble up to the conduit code.

1
2
When processing 4 caught exception: MyException "got a 3 in main10"
When processing 5 caught exception: MyException "got a 3 in main10"
"main10 finished"

Example: catchAll in conduit

A combinator that runs an IO action and catches any exception:

> catchBlah :: Show a => (a -> IO ()) -> a -> IO ()
> catchBlah action = x -> catchAll (action x)
>                                   ((e :: SomeException) -> putStrLn $ "On value " ++ show x ++ " caught exception: " ++ show e)

Using catchBlah in the sink:

> main11 :: IO ()
> main11 = do
>
>   let foo x = if x == 3 then throwM $ MyException "got a 3 in main11"
>                         else print x
>
>   sourceList [1..5] $$ (mapM_ $ catchBlah foo)
>
>   print "main11 finished"

Now the conduit processes every value, because the exception is caught and dealt with at a lower level.

*Main> main11
1
2
On value 3 caught exception: MyException "got a 3 in main11"
4
5
"main11 finished"

Example: catchBlah’ in conduit

Now, suppose we have a few stages in the conduit and the first stage blows up. Use catchAll to catch the exception and return a IO (Maybe b) instead of IO b:

> catchBlah' :: Show a => (a -> IO b) -> a -> IO (Maybe b)
> catchBlah' action = x -> do
>   catchAll (action x >>= (return . Just))
>            ((e :: SomeException) -> do putStrLn $ "On value " ++ show x ++ " caught exception: " ++ show e
>                                         return Nothing)
> main12 :: IO ()
> main12 = do
>
>   let src = [1..5] :: [Int]
>
>   let stage1 x = do when (x == 3) $ throwM $ MyException "Got a 3 in stage1"
>                     putStrLn $ "First print: " ++ show x
>                     return x
>
>   sourceList src $$ (mapM $ catchBlah' stage1) =$= (mapM_ print)
>
>   print "main12 finished"

Output:

First print: 1
Just 1
First print: 2
Just 2
On value 3 caught exception: MyException "Got a 3 in stage1"
Nothing
First print: 4
Just 4
First print: 5
Just 5
"main12 finished"

Example: catchBlah’ in conduit (tweaked)

Same as the previous example but with nicer printing in the sink:

> main13 :: IO ()
> main13 = do
>
>   let src = [1..5] :: [Int]
>
>   let stage1 x = do when (x == 3) $ throwM $ MyException "Got a 3 in stage1"
>                     putStrLn $ "First print: " ++ show x
>                     return x
>       stage2 x = case x of
>                       Just x' -> do putStrLn $ "Second print: " ++ show (x' + 1)
>                                     putStrLn ""
>                       Nothing -> do putStrLn $ "Second print got Nothing..."
>                                     putStrLn ""
>
>   sourceList src $$ (mapM $ catchBlah' stage1) =$= (mapM_ stage2)
>
>   print "main13 finished"

Output:

*Main> main13
First print: 1
Second print: 2

First print: 2
Second print: 3

On value 3 caught exception: MyException "Got a 3 in stage1"
Second print got Nothing...

First print: 4
Second print: 5

First print: 5
Second print: 6

"main13 finished"

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s