Threading Helpers / Wrappers¶
Helper functions and classes to ease the use Thread’s with python’s threading library
Utilities for working with Event¶
Classes¶
Class
Description
BetterEvent is a sub-class of
Eventwith more flexibility + features
InvertibleEvent(this is just an alias for
BetterEvent)
Functions¶
Function
Description
Allows waiting for more than one
Event/BetterEventat onceA wrapper function for
event_multi_wait()withtrigger='and'as defaultA wrapper function for
event_multi_wait()withtrigger='or'as default
Utilities for working with Lock thread locks¶
In the Python standard library, you can only use context management directly against a Lock object, which means
you’re unable to specify things such as a timeout, whether or not to block, nor a built-in option to request an exception
to be raised if the lock can’t be acquired.
And thus, lock_acquire_timeout() was created - to solve all of the above problems, in one easy to use context management
function :)
The lock_acquire_timeout() function - a context manager ( with lock_acquire_timeout(lock) ), is designed to allow use of
context management with standard threading.Lock objects, with the ability to specify important parameters such as:
whether or not to block while acquiring the lock
an optional timeout - so that it gives up waiting for the
lock.acquireafter so many secondswhether to raise
LockWaitTimeoutif theacquiretimes out instead of returningNone
Functions¶
Function
Description
Flexible context manager for acquiring
Locks’swith lock_acquire_timeout(lock)
Utilities for working with Thread thread objects¶
Using SafeLoopThread for looping threads with queue.Queue’s + stop/pause support¶
First example - we’ll create a sub-class of SafeLoopThread called MyThread:
>>> # Create a sub-class of SafeLoopThread, and implement a loop() method
>>> class MyThread(SafeLoopThread):
... loop_sleep = 2 # 'run' will wait this many seconds between each run of your loop(). set to 0 to disable loop sleeps
... def loop(self):
... print("I'm looping!")
...
>>> t = MyThread() # Construct the class
Once we start the thread, we’ll see that I'm looping! will be printed about once every 2 seconds, since that’s what
we set loop_sleep to:
>>> t.start()
I'm looping!
I'm looping!
I'm looping!
Using SafeLoopThread.emit_pause() - we can pause the loop, which will silence the I'm looping! messages:
>>> t.emit_pause()
>>> # No output because the loop is now paused
To start the loop again, we can simply unpause it with SafeLoopThread.emit_unpause():
>>> t.emit_unpause()
I'm looping!
I'm looping!
I'm looping!
Once we’re done with MyThread, unlike a normal Thread, we can ask the thread to shutdown gracefully
using SafeLoopThread.emit_stop() like so:
>>> t.is_alive() # First we'll confirm the thread is still running
True
>>> t.emit_stop()
I'm looping!
>>> # The .loop method will finish it's current iteration (unless you add additional ``should_stop`` checks in the loop)
>>> # and then shutdown the thread by returning from ``.run``
>>> t.is_alive() # We can now see that the thread has shutdown as we requested it to.
False
Classes¶
Class
Description
A
Threadbase class which allows you easily add stop/pause support to your own threadsA
StopperThreadbased class which runs.loopin a loop, with stop/start support
-
class
privex.helpers.thread.BetterEvent(wait_on: str = 'set', name: str = None, default: bool = False, notify_set=True, notify_clear=True)[source]¶ BetterEvent(aliasInvertibleEvent) is a more flexible version ofthreading.Event, which adds many new capabilities / features on top ofthreading.Event:The
wait_onconstructor parameter allows you to choose what flag states that the standardwait()will trigger upon:'set'- the default - works likethreading.Event,wait()only triggers when the event is in the “set” state, i.e._flagisTrue'clear'- opposite of the default - works opposite tothreading.Event,wait()only triggers when the event is in the “clear” state, i.e._flagisFalse'both'- In thebothsetting,wait()will simply wait until_flagis changed, whether fromsettoclear, orcleartoset. This wait_on setting only works as long asnotify_setandnotify_clearare set toTrue
The
defaultconstructor parameter allows you to choose whether the event starts as “cleared” (False- default), or “set” (True), which is useful when using some of the alternativewait_onsettings.New
failparameter forwait(),wait_set()andwait_clear()- when this is set toTrue, the method will raiseEventWaitTimeoutwhen the timeout is hit, instead of just returningFalse.New
wait_set()method, this works like the classicthreading.Eventwaitmethod - it’s only triggered when_flagis set toTrue(set) - no matter whatwait_onsetting is active.New
wait_clear()method, this works opposite to the classicthreading.Eventwaitmethod - it’s only triggered when_flagis set toFalse(cleared) - no matter whatwait_onsetting is active.
Example Usage
Below is a very simple thread class using
SafeLoopThreadwhich uses aBetterEventso we can signal when it can start running, and when it’s allowed to restart itself:>>> from privex.helpers import BetterEvent, SafeLoopThread >>> >>> class MyThread(SafeLoopThread): ... def __init__(self, *args, trig, **kwargs): ... self.trig = trig ... super().__init__(*args, **kwargs) ... def loop(self): ... print("Waiting for trig to become set before doing stuff...") ... self.trig.wait() # Same behaviour as threading.Event.wait - waits for trig.set() ... print("trig is set. doing stuff...") ... print("finished doing stuff.") ... print("Waiting for trig to become clear before restarting loop...") ... self.trig.wait_clear() # Unlike threading.Event, BetterEvent allows waiting for the "clear" signal ... >>> evt = BetterEvent(name='My Event') >>> t = MyThread(trig=evt) >>> t.start() Waiting for trig to become set before doing stuff... >>> evt.set() # We flip evt (trig) to "set", which notifies MyThread it can proceed. trig is set. doing stuff... finished doing stuff. Waiting for trig to become clear before restarting loop... >>> evt.clear() # Unlike threading.Event, we can "clear" the event, and MyThread will detect the "clear" signal instantly. Waiting for trig to become set before doing stuff... >>> evt.set() # The loop restarted. Now we can flip trig back to "set" trig is set. doing stuff... finished doing stuff. Waiting for trig to become clear before restarting loop...
-
clear()[source]¶ Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.
-
set()[source]¶ Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.
-
wait(timeout: Optional[Union[int, float]] = None, fail=False) → bool[source]¶ Multi-purpose
waitmethod which works similarly tothreading.Event.wait()but with some extra features.This method’s behaviour will vary depending on what the
wait_onsetting is set to:'set'- the default - works likethreading.Event,wait()only triggers when the event is in the “set” state, i.e._flagisTrue'clear'- opposite of the default - works opposite tothreading.Event,wait()only triggers when the event is in the “clear” state, i.e._flagisFalse'both'- In thebothsetting,wait()will simply wait until_flagis changed, whether fromsettoclear, orcleartoset. This wait_on setting only works as long asnotify_setandnotify_clearare set toTrue
- Parameters
- Return bool signal
This method returns the internal flag on exit, so it will always return
Trueexcept if a timeout is given and the operation times out.
-
privex.helpers.thread.InvertibleEvent¶ alias of
privex.helpers.thread.BetterEvent
-
class
privex.helpers.thread.SafeLoopThread(*args, default_stop=False, default_pause=False, **kwargs)[source]¶ -
run() → None[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
-
class
privex.helpers.thread.StopperThread(*args, default_stop=False, default_pause=False, stop_events=None, pause_events=None, **kwargs)[source]¶ A
threading.Threadthread sub-class which implementsBetterEventevents allowing you to signal the thread when you want it to shutdown or pause.You must check
should_stop/should_runandshould_pausewithin your thread run body to detect when your thread needs to shutdown / pause.
-
privex.helpers.thread.event_multi_wait(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], trigger='and', event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → Optional[Union[bool, List[Union[privex.helpers.thread.BetterEvent, threading.Event]]]][source]¶ Wait for multiple
threading.EventorBetterEvent‘s to become “set”, or “clear”.For standard
threading.Event‘s - only “set” can be waited on. You must useBetterEvent(generally works as a drop-in replacement) to be able to use theinvertorinvert_indexesoptions.Basic example:
>>> do_something_else = Event() >>> stop_running = Event() >>> >>> def some_thread(): ... # do some stuff... ... # now we wait for further instructions, until either do_something_else or stop_running is signalled: ... event_multi_wait(do_something_else, stop_running, trigger='any') ... if stop_running.is_set(): return False ... if do_something_else.is_set(): ... # do something else.
- Parameters
events (Event|BetterEvent) – Multiple
threading.Eventreferences to be waited on.trigger (str) – To return when ALL events are set, specify one of
and|all|every, while to return when ANY of the specified events are set, specify one ofor|any|eitherevent_sleep (float|int) – The maximum amount of time per event check iteration. This is divided by the amount of events which were passed, so we can use the highly efficient
event.wait()method.wait_timeout (float|int) – The maximum amount of time (in seconds) to wait for all/any of the
eventsto signal. Set toNoneto disable wait timeout. When timing out, raisesEventWaitTimeoutiffail=True, otherwise simply returnsNone.fail (bool) – When wait_timeout is hit, will raise
EventWaitTimeoutiffail=True, otherwise will simply returnNone.kwargs – Additional settings
- Key bool invert
(Default:
False) Wait foreventsto become “clear” (False). NOTE: This only works withBetterEventevents.- Key list invert_indexes
Wait for the events at these indexes to become “clear” instead of “set”. If
invertis set toTrue, then we’ll wait for these indexes to become “set” instead of “clear”. NOTE: This only works withBetterEventevents.- Return bool success
Trueifeventsmet thetrigger, otherwiseNone- Return List[_evt_btevt] events
If
BetterEventsare passed, andtriggeris “any”, then a list of the events which were set (or ifinvertisTrue, then events that weren’t set.
-
privex.helpers.thread.event_multi_wait_all(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → bool[source]¶ Wrapper function for
event_wait_many()withtriggerdefaulting toand(return when ALL events are triggered)
-
privex.helpers.thread.event_multi_wait_any(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → bool[source]¶ Wrapper function for
event_wait_many()withtriggerdefaulting toor(return when ANY event triggers)
-
privex.helpers.thread.lock_acquire_timeout(lock: _thread.allocate_lock, timeout: Union[int, float] = 10, fail=False, block=True)[source]¶ A context manager (
with lock_acquire_timeout(mylock) as locked:) for acquiring thread locks and waiting for them to be released, and giving up if the lock isn’t released withintimeout.Yields a boolean in the
withcontext which isTrueif thethreading.Lockwas acquired withintimeout, orFalseif it wasn’t.>>> from privex.helpers import lock_acquire_timeout >>> from threading import Lock >>> >>> my_lock = Lock() >>> >>> def some_func(): ... print("attempting to acquire a lock on 'my_lock'... will wait up to 30 secs...") ... with lock_acquire_timeout(my_lock, timeout=30) as locked: ... if not locked: ... raise Exception("Failed to acquire 'my_lock' after waiting 30 seconds!") ... print("successfully acquired a lock on 'my_lock'") ... print("finished. my_lock should have been automatically released.")
Original written by “robbles” on StackOverflow: https://stackoverflow.com/a/16782391/2648583
- Parameters
lock (Lock) – The
threading.Lockobject to attempt to acquire a lock ontimeout (int|float) – The amount of seconds to wait for
lockto be released if it’s already lockedfail (bool) – (Default:
False) If this isTrue, will raiseLockWaitTimeoutif we fail to acquire the locklockwithintimeoutseconds.block (bool) – If this is set to
False,timeoutwill be nulled and a non-blocking acquire will be done.
- Raises
LockWaitTimeout – When
failisTrueand we fail to acquirelockwithintimeoutseconds.
Thread Module Functions¶
Functions
event_multi_wait(*events[, trigger, …])Wait for multiple
threading.EventorBetterEvent‘s to become “set”, or “clear”.
event_multi_wait_all(*events[, event_sleep, …])Wrapper function for
event_wait_many()withtriggerdefaulting toand(return when ALL events are triggered)
event_multi_wait_any(*events[, event_sleep, …])Wrapper function for
event_wait_many()withtriggerdefaulting toor(return when ANY event triggers)
lock_acquire_timeout(lock[, timeout, fail, …])A context manager (
with lock_acquire_timeout(mylock) as locked:) for acquiring thread locks and waiting for them to be released, and giving up if the lock isn’t released withintimeout.
Thread Module Classes¶
Classes
BetterEvent(wait_on, name, default[, …])
BetterEvent(aliasInvertibleEvent) is a more flexible version ofthreading.Event, which adds many new capabilities / features on top ofthreading.Event:
SafeLoopThread(*args[, default_stop, …])
StopperThread(*args[, default_stop, …])A
threading.Threadthread sub-class which implementsBetterEventevents allowing you to signal the thread when you want it to shutdown or pause.