Threading Helpers / Wrappers

Helper functions and classes to ease the use Thread’s with python’s threading library

Utilities for working with Event

Classes

Class

Description

BetterEvent

BetterEvent is a sub-class of Event with more flexibility + features

InvertibleEvent

(this is just an alias for BetterEvent)

Functions

Function

Description

event_multi_wait()

Allows waiting for more than one Event / BetterEvent at once

event_multi_wait_all()

A wrapper function for event_multi_wait() with trigger='and' as default

event_multi_wait_any()

A wrapper function for event_multi_wait() with trigger='or' as default

Utilities for working with Lock thread locks

In the Python standard library, you can only use context management directly against a Lock object, which means you’re unable to specify things such as a timeout, whether or not to block, nor a built-in option to request an exception to be raised if the lock can’t be acquired.

And thus, lock_acquire_timeout() was created - to solve all of the above problems, in one easy to use context management function :)

The lock_acquire_timeout() function - a context manager ( with lock_acquire_timeout(lock) ), is designed to allow use of context management with standard threading.Lock objects, with the ability to specify important parameters such as:

  • whether or not to block while acquiring the lock

  • an optional timeout - so that it gives up waiting for the lock.acquire after so many seconds

  • whether to raise LockWaitTimeout if the acquire times out instead of returning None

Functions

Function

Description

lock_acquire_timeout()

Flexible context manager for acquiring Locks’s with lock_acquire_timeout(lock)

Utilities for working with Thread thread objects

Using SafeLoopThread for looping threads with queue.Queue’s + stop/pause support

First example - we’ll create a sub-class of SafeLoopThread called MyThread:

>>> # Create a sub-class of SafeLoopThread, and implement a loop() method
>>> class MyThread(SafeLoopThread):
...     loop_sleep = 2    # 'run' will wait this many seconds between each run of your loop(). set to 0 to disable loop sleeps
...     def loop(self):
...         print("I'm looping!")
...
>>> t = MyThread()   # Construct the class

Once we start the thread, we’ll see that I'm looping! will be printed about once every 2 seconds, since that’s what we set loop_sleep to:

>>> t.start()
I'm looping!
I'm looping!
I'm looping!

Using SafeLoopThread.emit_pause() - we can pause the loop, which will silence the I'm looping! messages:

>>> t.emit_pause()
>>> # No output because the loop is now paused

To start the loop again, we can simply unpause it with SafeLoopThread.emit_unpause():

>>> t.emit_unpause()
I'm looping!
I'm looping!
I'm looping!

Once we’re done with MyThread, unlike a normal Thread, we can ask the thread to shutdown gracefully using SafeLoopThread.emit_stop() like so:

>>> t.is_alive()    # First we'll confirm the thread is still running
True
>>> t.emit_stop()
I'm looping!
>>> # The .loop method will finish it's current iteration (unless you add additional ``should_stop`` checks in the loop)
>>> # and then shutdown the thread by returning from ``.run``
>>> t.is_alive()    # We can now see that the thread has shutdown as we requested it to.
False

Classes

Class

Description

StopperThread

A Thread base class which allows you easily add stop/pause support to your own threads

SafeLoopThread

A StopperThread based class which runs .loop in a loop, with stop/start support

class privex.helpers.thread.BetterEvent(wait_on: str = 'set', name: str = None, default: bool = False, notify_set=True, notify_clear=True)[source]

BetterEvent (alias InvertibleEvent) is a more flexible version of threading.Event, which adds many new capabilities / features on top of threading.Event:

  • The wait_on constructor parameter allows you to choose what flag states that the standard wait() will trigger upon:

    • 'set' - the default - works like threading.Event, wait() only triggers when the event is in the “set” state, i.e. _flag is True

    • 'clear' - opposite of the default - works opposite to threading.Event, wait() only triggers when the event is in the “clear” state, i.e. _flag is False

    • 'both' - In the both setting, wait() will simply wait until _flag is changed, whether from set to clear, or clear to set. This wait_on setting only works as long as notify_set and notify_clear are set to True

  • The default constructor parameter allows you to choose whether the event starts as “cleared” (False - default), or “set” (True), which is useful when using some of the alternative wait_on settings.

  • New fail parameter for wait(), wait_set() and wait_clear() - when this is set to True, the method will raise EventWaitTimeout when the timeout is hit, instead of just returning False.

  • New wait_set() method, this works like the classic threading.Event wait method - it’s only triggered when _flag is set to True (set) - no matter what wait_on setting is active.

  • New wait_clear() method, this works opposite to the classic threading.Event wait method - it’s only triggered when _flag is set to False (cleared) - no matter what wait_on setting is active.

Example Usage

Below is a very simple thread class using SafeLoopThread which uses a BetterEvent so we can signal when it can start running, and when it’s allowed to restart itself:

>>> from privex.helpers import BetterEvent, SafeLoopThread
>>>
>>> class MyThread(SafeLoopThread):
...     def __init__(self, *args, trig, **kwargs):
...         self.trig = trig
...         super().__init__(*args, **kwargs)
...     def loop(self):
...         print("Waiting for trig to become set before doing stuff...")
...         self.trig.wait()   # Same behaviour as threading.Event.wait - waits for trig.set()
...         print("trig is set. doing stuff...")
...         print("finished doing stuff.")
...         print("Waiting for trig to become clear before restarting loop...")
...         self.trig.wait_clear()  # Unlike threading.Event, BetterEvent allows waiting for the "clear" signal
...
>>> evt = BetterEvent(name='My Event')
>>> t = MyThread(trig=evt)
>>> t.start()
Waiting for trig to become set before doing stuff...
>>> evt.set()   # We flip evt (trig) to "set", which notifies MyThread it can proceed.
trig is set. doing stuff...
finished doing stuff.
Waiting for trig to become clear before restarting loop...
>>> evt.clear()  # Unlike threading.Event, we can "clear" the event, and MyThread will detect the "clear" signal instantly.
Waiting for trig to become set before doing stuff...
>>> evt.set()    # The loop restarted. Now we can flip trig back to "set"
trig is set. doing stuff...
finished doing stuff.
Waiting for trig to become clear before restarting loop...
clear()[source]

Reset the internal flag to false.

Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.

set()[source]

Set the internal flag to true.

All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.

wait(timeout: Optional[Union[int, float]] = None, fail=False)bool[source]

Multi-purpose wait method which works similarly to threading.Event.wait() but with some extra features.

This method’s behaviour will vary depending on what the wait_on setting is set to:

  • 'set' - the default - works like threading.Event, wait() only triggers when the event is in the “set” state, i.e. _flag is True

  • 'clear' - opposite of the default - works opposite to threading.Event, wait() only triggers when the event is in the “clear” state, i.e. _flag is False

  • 'both' - In the both setting, wait() will simply wait until _flag is changed, whether from set to clear, or clear to set. This wait_on setting only works as long as notify_set and notify_clear are set to True

Parameters
  • fail (bool) – If True, raise EventWaitTimeout if timeout was reached while waiting for the event to change.

  • timeout (float) – Maximum amount of time to wait before giving up (None to disable timeout)

Return bool signal

This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.

wait_clear(timeout: Optional[float] = None, fail=False)bool[source]

Wait until _flag is False

wait_set(timeout: Optional[float] = None, fail=False)bool[source]

Wait until _flag is True

privex.helpers.thread.InvertibleEvent

alias of privex.helpers.thread.BetterEvent

class privex.helpers.thread.SafeLoopThread(*args, default_stop=False, default_pause=False, **kwargs)[source]
run()None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class privex.helpers.thread.StopperThread(*args, default_stop=False, default_pause=False, stop_events=None, pause_events=None, **kwargs)[source]

A threading.Thread thread sub-class which implements BetterEvent events allowing you to signal the thread when you want it to shutdown or pause.

You must check should_stop / should_run and should_pause within your thread run body to detect when your thread needs to shutdown / pause.

privex.helpers.thread.event_multi_wait(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], trigger='and', event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → Optional[Union[bool, List[Union[privex.helpers.thread.BetterEvent, threading.Event]]]][source]

Wait for multiple threading.Event or BetterEvent ‘s to become “set”, or “clear”.

For standard threading.Event ‘s - only “set” can be waited on. You must use BetterEvent (generally works as a drop-in replacement) to be able to use the invert or invert_indexes options.

Basic example:

>>> do_something_else = Event()
>>> stop_running = Event()
>>>
>>> def some_thread():
...     # do some stuff...
...     # now we wait for further instructions, until either do_something_else or stop_running is signalled:
...     event_multi_wait(do_something_else, stop_running, trigger='any')
...     if stop_running.is_set(): return False
...     if do_something_else.is_set():
...         # do something else.
Parameters
  • events (Event|BetterEvent) – Multiple threading.Event references to be waited on.

  • trigger (str) – To return when ALL events are set, specify one of and|all|every, while to return when ANY of the specified events are set, specify one of or|any|either

  • event_sleep (float|int) – The maximum amount of time per event check iteration. This is divided by the amount of events which were passed, so we can use the highly efficient event.wait() method.

  • wait_timeout (float|int) – The maximum amount of time (in seconds) to wait for all/any of the events to signal. Set to None to disable wait timeout. When timing out, raises EventWaitTimeout if fail=True, otherwise simply returns None.

  • fail (bool) – When wait_timeout is hit, will raise EventWaitTimeout if fail=True, otherwise will simply return None.

  • kwargs – Additional settings

Key bool invert

(Default: False) Wait for events to become “clear” (False). NOTE: This only works with BetterEvent events.

Key list invert_indexes

Wait for the events at these indexes to become “clear” instead of “set”. If invert is set to True, then we’ll wait for these indexes to become “set” instead of “clear”. NOTE: This only works with BetterEvent events.

Return bool success

True if events met the trigger, otherwise None

Return List[_evt_btevt] events

If BetterEvents are passed, and trigger is “any”, then a list of the events which were set (or if invert is True, then events that weren’t set.

privex.helpers.thread.event_multi_wait_all(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], event_sleep=0.5, wait_timeout=None, fail=True, **kwargs)bool[source]

Wrapper function for event_wait_many() with trigger defaulting to and (return when ALL events are triggered)

privex.helpers.thread.event_multi_wait_any(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], event_sleep=0.5, wait_timeout=None, fail=True, **kwargs)bool[source]

Wrapper function for event_wait_many() with trigger defaulting to or (return when ANY event triggers)

privex.helpers.thread.lock_acquire_timeout(lock: _thread.allocate_lock, timeout: Union[int, float] = 10, fail=False, block=True)[source]

A context manager (with lock_acquire_timeout(mylock) as locked:) for acquiring thread locks and waiting for them to be released, and giving up if the lock isn’t released within timeout.

Yields a boolean in the with context which is True if the threading.Lock was acquired within timeout, or False if it wasn’t.

>>> from privex.helpers import lock_acquire_timeout
>>> from threading import Lock
>>>
>>> my_lock = Lock()
>>>
>>> def some_func():
...     print("attempting to acquire a lock on 'my_lock'... will wait up to 30 secs...")
...     with lock_acquire_timeout(my_lock, timeout=30) as locked:
...         if not locked:
...             raise Exception("Failed to acquire 'my_lock' after waiting 30 seconds!")
...         print("successfully acquired a lock on 'my_lock'")
...     print("finished. my_lock should have been automatically released.")

Original written by “robbles” on StackOverflow: https://stackoverflow.com/a/16782391/2648583

Parameters
  • lock (Lock) – The threading.Lock object to attempt to acquire a lock on

  • timeout (int|float) – The amount of seconds to wait for lock to be released if it’s already locked

  • fail (bool) – (Default: False) If this is True, will raise LockWaitTimeout if we fail to acquire the lock lock within timeout seconds.

  • block (bool) – If this is set to False, timeout will be nulled and a non-blocking acquire will be done.

Raises

LockWaitTimeout – When fail is True and we fail to acquire lock within timeout seconds.

Thread Module Functions

Functions

event_multi_wait(*events[, trigger, …])

Wait for multiple threading.Event or BetterEvent ‘s to become “set”, or “clear”.

event_multi_wait_all(*events[, event_sleep, …])

Wrapper function for event_wait_many() with trigger defaulting to and (return when ALL events are triggered)

event_multi_wait_any(*events[, event_sleep, …])

Wrapper function for event_wait_many() with trigger defaulting to or (return when ANY event triggers)

lock_acquire_timeout(lock[, timeout, fail, …])

A context manager (with lock_acquire_timeout(mylock) as locked:) for acquiring thread locks and waiting for them to be released, and giving up if the lock isn’t released within timeout.

Thread Module Classes

Classes

BetterEvent(wait_on, name, default[, …])

BetterEvent (alias InvertibleEvent) is a more flexible version of threading.Event, which adds many new capabilities / features on top of threading.Event:

SafeLoopThread(*args[, default_stop, …])

StopperThread(*args[, default_stop, …])

A threading.Thread thread sub-class which implements BetterEvent events allowing you to signal the thread when you want it to shutdown or pause.