Threading Helpers / Wrappers¶
Helper functions and classes to ease the use Thread
’s with python’s threading
library
Utilities for working with Event
¶
Classes¶
Class
Description
BetterEvent is a sub-class of
Event
with more flexibility + features
InvertibleEvent
(this is just an alias for
BetterEvent
)
Functions¶
Function
Description
Allows waiting for more than one
Event
/BetterEvent
at onceA wrapper function for
event_multi_wait()
withtrigger='and'
as defaultA wrapper function for
event_multi_wait()
withtrigger='or'
as default
Utilities for working with Lock
thread locks¶
In the Python standard library, you can only use context management directly against a Lock
object, which means
you’re unable to specify things such as a timeout, whether or not to block, nor a built-in option to request an exception
to be raised if the lock can’t be acquired.
And thus, lock_acquire_timeout()
was created - to solve all of the above problems, in one easy to use context management
function :)
The lock_acquire_timeout()
function - a context manager ( with lock_acquire_timeout(lock)
), is designed to allow use of
context management with standard threading.Lock
objects, with the ability to specify important parameters such as:
whether or not to block while acquiring the lock
an optional timeout - so that it gives up waiting for the
lock.acquire
after so many secondswhether to raise
LockWaitTimeout
if theacquire
times out instead of returningNone
Functions¶
Function
Description
Flexible context manager for acquiring
Locks
’swith lock_acquire_timeout(lock)
Utilities for working with Thread
thread objects¶
Using SafeLoopThread
for looping threads with queue.Queue
’s + stop/pause support¶
First example - we’ll create a sub-class of SafeLoopThread
called MyThread
:
>>> # Create a sub-class of SafeLoopThread, and implement a loop() method
>>> class MyThread(SafeLoopThread):
... loop_sleep = 2 # 'run' will wait this many seconds between each run of your loop(). set to 0 to disable loop sleeps
... def loop(self):
... print("I'm looping!")
...
>>> t = MyThread() # Construct the class
Once we start the thread, we’ll see that I'm looping!
will be printed about once every 2 seconds, since that’s what
we set loop_sleep
to:
>>> t.start()
I'm looping!
I'm looping!
I'm looping!
Using SafeLoopThread.emit_pause()
- we can pause the loop, which will silence the I'm looping!
messages:
>>> t.emit_pause()
>>> # No output because the loop is now paused
To start the loop again, we can simply unpause it with SafeLoopThread.emit_unpause()
:
>>> t.emit_unpause()
I'm looping!
I'm looping!
I'm looping!
Once we’re done with MyThread
, unlike a normal Thread
, we can ask the thread to shutdown gracefully
using SafeLoopThread.emit_stop()
like so:
>>> t.is_alive() # First we'll confirm the thread is still running
True
>>> t.emit_stop()
I'm looping!
>>> # The .loop method will finish it's current iteration (unless you add additional ``should_stop`` checks in the loop)
>>> # and then shutdown the thread by returning from ``.run``
>>> t.is_alive() # We can now see that the thread has shutdown as we requested it to.
False
Classes¶
Class
Description
A
Thread
base class which allows you easily add stop/pause support to your own threadsA
StopperThread
based class which runs.loop
in a loop, with stop/start support
-
class
privex.helpers.thread.
BetterEvent
(wait_on: str = 'set', name: str = None, default: bool = False, notify_set=True, notify_clear=True)[source]¶ BetterEvent
(aliasInvertibleEvent
) is a more flexible version ofthreading.Event
, which adds many new capabilities / features on top ofthreading.Event
:The
wait_on
constructor parameter allows you to choose what flag states that the standardwait()
will trigger upon:'set'
- the default - works likethreading.Event
,wait()
only triggers when the event is in the “set” state, i.e._flag
isTrue
'clear'
- opposite of the default - works opposite tothreading.Event
,wait()
only triggers when the event is in the “clear” state, i.e._flag
isFalse
'both'
- In theboth
setting,wait()
will simply wait until_flag
is changed, whether fromset
toclear
, orclear
toset
. This wait_on setting only works as long asnotify_set
andnotify_clear
are set toTrue
The
default
constructor parameter allows you to choose whether the event starts as “cleared” (False
- default), or “set” (True
), which is useful when using some of the alternativewait_on
settings.New
fail
parameter forwait()
,wait_set()
andwait_clear()
- when this is set toTrue
, the method will raiseEventWaitTimeout
when the timeout is hit, instead of just returningFalse
.New
wait_set()
method, this works like the classicthreading.Event
wait
method - it’s only triggered when_flag
is set toTrue
(set) - no matter whatwait_on
setting is active.New
wait_clear()
method, this works opposite to the classicthreading.Event
wait
method - it’s only triggered when_flag
is set toFalse
(cleared) - no matter whatwait_on
setting is active.
Example Usage
Below is a very simple thread class using
SafeLoopThread
which uses aBetterEvent
so we can signal when it can start running, and when it’s allowed to restart itself:>>> from privex.helpers import BetterEvent, SafeLoopThread >>> >>> class MyThread(SafeLoopThread): ... def __init__(self, *args, trig, **kwargs): ... self.trig = trig ... super().__init__(*args, **kwargs) ... def loop(self): ... print("Waiting for trig to become set before doing stuff...") ... self.trig.wait() # Same behaviour as threading.Event.wait - waits for trig.set() ... print("trig is set. doing stuff...") ... print("finished doing stuff.") ... print("Waiting for trig to become clear before restarting loop...") ... self.trig.wait_clear() # Unlike threading.Event, BetterEvent allows waiting for the "clear" signal ... >>> evt = BetterEvent(name='My Event') >>> t = MyThread(trig=evt) >>> t.start() Waiting for trig to become set before doing stuff... >>> evt.set() # We flip evt (trig) to "set", which notifies MyThread it can proceed. trig is set. doing stuff... finished doing stuff. Waiting for trig to become clear before restarting loop... >>> evt.clear() # Unlike threading.Event, we can "clear" the event, and MyThread will detect the "clear" signal instantly. Waiting for trig to become set before doing stuff... >>> evt.set() # The loop restarted. Now we can flip trig back to "set" trig is set. doing stuff... finished doing stuff. Waiting for trig to become clear before restarting loop...
-
clear
()[source]¶ Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.
-
set
()[source]¶ Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.
-
wait
(timeout: Optional[Union[int, float]] = None, fail=False) → bool[source]¶ Multi-purpose
wait
method which works similarly tothreading.Event.wait()
but with some extra features.This method’s behaviour will vary depending on what the
wait_on
setting is set to:'set'
- the default - works likethreading.Event
,wait()
only triggers when the event is in the “set” state, i.e._flag
isTrue
'clear'
- opposite of the default - works opposite tothreading.Event
,wait()
only triggers when the event is in the “clear” state, i.e._flag
isFalse
'both'
- In theboth
setting,wait()
will simply wait until_flag
is changed, whether fromset
toclear
, orclear
toset
. This wait_on setting only works as long asnotify_set
andnotify_clear
are set toTrue
- Parameters
- Return bool signal
This method returns the internal flag on exit, so it will always return
True
except if a timeout is given and the operation times out.
-
privex.helpers.thread.
InvertibleEvent
¶ alias of
privex.helpers.thread.BetterEvent
-
class
privex.helpers.thread.
SafeLoopThread
(*args, default_stop=False, default_pause=False, **kwargs)[source]¶ -
run
() → None[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
-
class
privex.helpers.thread.
StopperThread
(*args, default_stop=False, default_pause=False, stop_events=None, pause_events=None, **kwargs)[source]¶ A
threading.Thread
thread sub-class which implementsBetterEvent
events allowing you to signal the thread when you want it to shutdown or pause.You must check
should_stop
/should_run
andshould_pause
within your thread run body to detect when your thread needs to shutdown / pause.
-
privex.helpers.thread.
event_multi_wait
(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], trigger='and', event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → Optional[Union[bool, List[Union[privex.helpers.thread.BetterEvent, threading.Event]]]][source]¶ Wait for multiple
threading.Event
orBetterEvent
‘s to become “set”, or “clear”.For standard
threading.Event
‘s - only “set” can be waited on. You must useBetterEvent
(generally works as a drop-in replacement) to be able to use theinvert
orinvert_indexes
options.Basic example:
>>> do_something_else = Event() >>> stop_running = Event() >>> >>> def some_thread(): ... # do some stuff... ... # now we wait for further instructions, until either do_something_else or stop_running is signalled: ... event_multi_wait(do_something_else, stop_running, trigger='any') ... if stop_running.is_set(): return False ... if do_something_else.is_set(): ... # do something else.
- Parameters
events (Event|BetterEvent) – Multiple
threading.Event
references to be waited on.trigger (str) – To return when ALL events are set, specify one of
and|all|every
, while to return when ANY of the specified events are set, specify one ofor|any|either
event_sleep (float|int) – The maximum amount of time per event check iteration. This is divided by the amount of events which were passed, so we can use the highly efficient
event.wait()
method.wait_timeout (float|int) – The maximum amount of time (in seconds) to wait for all/any of the
events
to signal. Set toNone
to disable wait timeout. When timing out, raisesEventWaitTimeout
iffail=True
, otherwise simply returnsNone
.fail (bool) – When wait_timeout is hit, will raise
EventWaitTimeout
iffail=True
, otherwise will simply returnNone
.kwargs – Additional settings
- Key bool invert
(Default:
False
) Wait forevents
to become “clear” (False
). NOTE: This only works withBetterEvent
events.- Key list invert_indexes
Wait for the events at these indexes to become “clear” instead of “set”. If
invert
is set toTrue
, then we’ll wait for these indexes to become “set” instead of “clear”. NOTE: This only works withBetterEvent
events.- Return bool success
True
ifevents
met thetrigger
, otherwiseNone
- Return List[_evt_btevt] events
If
BetterEvents
are passed, andtrigger
is “any”, then a list of the events which were set (or ifinvert
isTrue
, then events that weren’t set.
-
privex.helpers.thread.
event_multi_wait_all
(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → bool[source]¶ Wrapper function for
event_wait_many()
withtrigger
defaulting toand
(return when ALL events are triggered)
-
privex.helpers.thread.
event_multi_wait_any
(*events: Union[privex.helpers.thread.BetterEvent, threading.Event], event_sleep=0.5, wait_timeout=None, fail=True, **kwargs) → bool[source]¶ Wrapper function for
event_wait_many()
withtrigger
defaulting toor
(return when ANY event triggers)
-
privex.helpers.thread.
lock_acquire_timeout
(lock: _thread.allocate_lock, timeout: Union[int, float] = 10, fail=False, block=True)[source]¶ A context manager (
with lock_acquire_timeout(mylock) as locked:
) for acquiring thread locks and waiting for them to be released, and giving up if the lock isn’t released withintimeout
.Yields a boolean in the
with
context which isTrue
if thethreading.Lock
was acquired withintimeout
, orFalse
if it wasn’t.>>> from privex.helpers import lock_acquire_timeout >>> from threading import Lock >>> >>> my_lock = Lock() >>> >>> def some_func(): ... print("attempting to acquire a lock on 'my_lock'... will wait up to 30 secs...") ... with lock_acquire_timeout(my_lock, timeout=30) as locked: ... if not locked: ... raise Exception("Failed to acquire 'my_lock' after waiting 30 seconds!") ... print("successfully acquired a lock on 'my_lock'") ... print("finished. my_lock should have been automatically released.")
Original written by “robbles” on StackOverflow: https://stackoverflow.com/a/16782391/2648583
- Parameters
lock (Lock) – The
threading.Lock
object to attempt to acquire a lock ontimeout (int|float) – The amount of seconds to wait for
lock
to be released if it’s already lockedfail (bool) – (Default:
False
) If this isTrue
, will raiseLockWaitTimeout
if we fail to acquire the locklock
withintimeout
seconds.block (bool) – If this is set to
False
,timeout
will be nulled and a non-blocking acquire will be done.
- Raises
LockWaitTimeout – When
fail
isTrue
and we fail to acquirelock
withintimeout
seconds.
Thread Module Functions¶
Functions
event_multi_wait
(*events[, trigger, …])Wait for multiple
threading.Event
orBetterEvent
‘s to become “set”, or “clear”.
event_multi_wait_all
(*events[, event_sleep, …])Wrapper function for
event_wait_many()
withtrigger
defaulting toand
(return when ALL events are triggered)
event_multi_wait_any
(*events[, event_sleep, …])Wrapper function for
event_wait_many()
withtrigger
defaulting toor
(return when ANY event triggers)
lock_acquire_timeout
(lock[, timeout, fail, …])A context manager (
with lock_acquire_timeout(mylock) as locked:
) for acquiring thread locks and waiting for them to be released, and giving up if the lock isn’t released withintimeout
.
Thread Module Classes¶
Classes
BetterEvent
(wait_on, name, default[, …])
BetterEvent
(aliasInvertibleEvent
) is a more flexible version ofthreading.Event
, which adds many new capabilities / features on top ofthreading.Event
:
SafeLoopThread
(*args[, default_stop, …])
StopperThread
(*args[, default_stop, …])A
threading.Thread
thread sub-class which implementsBetterEvent
events allowing you to signal the thread when you want it to shutdown or pause.