Cara menggunakan python raise dummy exception

You must handle exceptions when using the ThreadPoolExecutor in Python.

Exceptions may be raised when initializing worker threads, in target task threads, and in callback functions once tasks are completed.

In this tutorial, you will discover how to handle exceptions in a Python thread pool.

Let’s get started.

  • ThreadPoolExecutor Exception Handling
  • Exception Handling in Thread Initialization
  • Exception Handling in Task Execution
    • Exception Handling Within the Task
    • Exception Handling Outside the Task with submit()
    • Exception Handling Outside the Task with map()
  • Exception Handling in Callbacks
  • Further Reading
  • Takeaways

ThreadPoolExecutor Exception Handling

Exception handling is an important consideration when using threads.

Code will raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.

Python threads are well suited for use with IO-bound tasks, and operations within these tasks often raise exceptions, such as if a server cannot be reached, if the network goes down, if a file cannot be found, and so on.

There are three points you may need to consider with regards to exception handling when using the ThreadPoolExecutor; they are:

  • Thread Initialization
  • Task Execution
  • Task Completion Callbacks

Let’s take a closer look at each point in turn.

Exception Handling in Thread Initialization

You can specify a custom initialization function when configuring your ThreadPoolExecutor.

This can be set via the initializer argument to specify the function name and initargs to specify a tuple of arguments to the function.

Each thread started by the thread pool will call your initialization function before starting the thread.

If your initialization function raises an exception, it will break your thread pool.

All current tasks and any future tasks executed by the thread pool will not run and will raise a BrokenThreadPool exception.

We can demonstrate this with an example of a contrived initializer function that raisees an exception.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# SuperFastPython.com

# example of an exception in a thread pool initializer function

from time import sleep

from random import random

from threading import current_thread

from concurrent.futures import ThreadPoolExecutor

# function for initializing the worker thread

def initializer_worker():

    # raises an exception

    raise Exception('Something bad happened!')

# a mock task that sleeps for a random amount of time less than one second

def task(identifier):

    sleep(random())

    # get the unique name

    returnidentifier

# create a thread pool

with ThreadPoolExecutor(max_workers=2,initializer=initializer_worker)asexecutor:

    # execute tasks

    forresult in executor.map(task,range(10)):

        print(result)

Running the example fails with an exception, as we expected.

The thread pool is created as per normal, but as soon as we try to execute tasks, new worker threads are created and the custom worker thread initialization function is called and raises an exception.

Multiple threads attempted to start, and in turn, multiple threads failed with an Exception. Finally, the thread pool itself logged a message that the pool is broken and cannot be used any longer.

Exception in initializer:

Traceback (most recent call last):

...

raise Exception('Something bad happened!')

Exception: Something bad happened!

Exception in initializer:

Traceback (most recent call last):

...

raise Exception('Something bad happened!')

Exception: Something bad happened!

Traceback (most recent call last):

...

concurrent.futures.thread.BrokenThreadPool: A thread initializer failed, the thread pool is not usable anymore

This highlights that if you use a custom initializer function, you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise, you’ll risk all tasks that depend on the thread pool.

Confused by the ThreadPoolExecutor class API?
Download my FREE PDF cheat sheet

Exception Handling in Task Execution

An exception may occur while executing your task.

This will cause the task to stop executing, but will not break the thread pool. Instead, the exception will be caught by the thread pool and will be available via the Future object associated with the task via the exception() function.

Alternately, the exception will be re-raised if you call result() in the future in order to get the result. This will impact both calls to submit() and map() when adding tasks to the thread pool.

It means that you have two options for handling exceptions in tasks; they are:

  1. Handle exceptions within the task function.
  2. Handle exceptions when getting results from tasks.

Exception Handling Within the Task

Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.

This could be via the return value from the function, e.g. None.

Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.

The example below defines a work task that will raise an exception but will catch the exception and return a result indicating a failure case.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

# SuperFastPython.com

# example of handling an exception raised within a task

from time import sleep

from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment

def work():

    sleep(1)

    try:

        raise Exception('Something bad happened!')

    except Exception:

        return'Unable to get the result'

    return "never gets here"

# create a thread pool

with ThreadPoolExecutor()asexecutor:

    # execute our task

    future= executor.submit(work)

    # get the result from the task

    result=future.result()

    print(result)

Running the example starts the thread pool as per normal, issues the task, then blocks waiting for the result.

The task raises an exception and the result received is an error message.

This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both submit() and map(). It may require special handling of a custom return value for the failure case.

Exception Handling Outside the Task with submit()

An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.

This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for loop.

It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.

The example below defines a simple task that raises an Exception, which is then handled by the recipient when attempting to get the result from the function call.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

# SuperFastPython.com

# example of handling an exception raised within a task

from time import sleep

from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment

def work():

    sleep(1)

    raise Exception('Something bad happened!')

    return"never gets here"

# create a thread pool

with ThreadPoolExecutor()asexecutor:

    # execute our task

    future=executor.submit(work)

    # get the result from the task

    try:

        result =future.result()

    except Exception:

        print('Unable to get the result')

Running the example creates the thread pool and submits the work as per normal. The task fails with an Exception, the thread pool catches the exception, stores it, then re-raises it when we call the result() function in the future.

The recipient of the result accepts the exception and catches it, reporting a failure case.

We can also check for the exception directly via a call to the exception() function on the Future object. This function blocks until an exception occurs and takes a timeout, just like a call to result().

If an exception never occurs and the task is cancelled or completes successfully, then exception() will return a value of None.

We can demonstrate the explicit checking for an exceptional case in the task in the example below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# SuperFastPython.com

# example of handling an exception raised within a task

from time import sleep

from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment

def work():

    sleep(1)

    raise Exception('Something bad happened!')

    return"never gets here"

# create a thread pool

with ThreadPoolExecutor()asexecutor:

    # execute our task

    future=executor.submit(work)

    # get the result from the task

    exception= future.exception()

    # handle exceptional case

    ifexception:

         print(exception)

    else:

        result =future.result()

        print(result)

Running the example creates and submits the work per normal.

The recipient checks for the exceptional case, which blocks until an exception is raised or the task is completed. An exception is received and is handled by reporting it.

Exception Handling Outside the Task with map()

What about tasks submitted via map()?

We cannot check the exception() function of the Future object for each task, as map() does not provide access to Future objects.

Worse still, the approach of handling the exception in the recipient cannot be used when using map() to submit tasks, unless you wrap the entire iteration.

We can demonstrate this by submitting one task with map() that happens to raise an Exception.

The complete example is listed below.

# SuperFastPython.com

# example of handling an exception raised within a task

from time import sleep

from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment

def work(value):

    sleep(1)

    raise Exception('Something bad happened!')

    returnf'Never gets here {value}'

# create a thread pool

with ThreadPoolExecutor()asexecutor:

    # execute our task

    forresult inexecutor.map(work, [1]):

        print(result)

Running the example submits the single task (a bad use for map()) and waits for the first result.

The task raises an exception and the main thread exits, as we expected.

Traceback (most recent call last):

...

raise Exception('Something bad happened!')

Exception: Something bad happened!

This highlights that if map() is used to submit tasks to the thread pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.


Need help with the ThreadPoolExecutor?

Sign-up to my FREE 7-day email course and discover how to use the ThreadPoolExecutor class, including how to configure the number of workers, how to execute tasks asynchronously, and much more!

Click the button below and enter your email address to sign-up and get the first lesson right now.

Start Your FREE Email Course Now!
 


Exception Handling in Callbacks

A final case we must consider for exception handling when using the ThreadPoolExecutor is in callback functions.

When issuing tasks to the thread pool with a call to submit(), we receive a Future object in return on which we can register callback functions to call when the task completes via the add_done_callback() function.

This allows one or more callback functions to be registered that will be executed in the order in which they are registered.

You can learn more about adding callbacks to tasks in the ThreadPoolExecutor here:

  • How to Add a Callbacks to the ThreadPoolExecutor in Python

These callbacks are always called, even if the task is cancelled or fails itself with an exception.

A callback can fail with an exception and it will not impact other callback functions that have been registered or the task.

The exception is caught by the thread pool, logged as an exception type message, and the procedure moves on. In a sense, callbacks are able to fail silently.

We can demonstrate this with a worked example with multiple callback functions, the first of which will raise an exception.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

# SuperFastPython.com

# add callbacks to a future, one of which raises an exception

from time import sleep

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures import wait

# callback function to call when a task is completed

def custom_callback1(future):

    raise Exception('Something bad happened!')

    # never gets here

    print('Callback 1 called.')

# callback function to call when a task is completed

def custom_callback2(future):

    print('Callback 2 called.')

# mock task that will sleep for a moment

def work():

    sleep(1)

    return'Task is done'

# create a thread pool

with ThreadPoolExecutor()asexecutor:

    # execute the task

    future= executor.submit(work)

    # add the custom callbacks

    future.add_done_callback(custom_callback1)

    future.add_done_callback(custom_callback2)

    # wait for the task to complete and get the result

    result=future.result()

    # wait for callbacks to finish

    sleep(0.1)

    print(result)

Running the example starts the thread pool as per normal and executes the task.

When the task completes, the first callback is called, which fails with an exception. The exception is logged and reported on the console (the default behavior for logging).

The thread pool is not broken and carries on.

The second callback is called successfully, and finally, the main thread gets the result of the task.

exception calling callback for <Future at 0x101d76730 state=finished returned str>

Traceback (most recent call last):

...

raise Exception('Something bad happened!')

Exception: Something bad happened!

Callback 2 called.

Task is done

This highlights that if callbacks are expected to raised an exception, they must be handled explicitly and checked for if you wish to have the failure impact the task itself.

Further Reading

This section provides additional resources that you may find helpful.

  • concurrent.futures - Launching parallel tasks
  • ThreadPoolExecutor: The Complete Guide
  • ThreadPoolExecutor Class API Cheat Sheet
  • Concurrent Futures API Interview Questions
  • ThreadPoolExecutor Jump-Start (my 7-day course)

Takeaways

You now know how to handle exceptions in the ThreadPoolExecutor in Python.

Do you have any questions about how to handle exceptions?
Ask your question in the comments below and I will do my best to answer.

Photo by Daniel Frank on Unsplash