AsyncIO Helpers / Wrappers¶
Functions and classes related to working with Python’s native asyncio support
To avoid issues with the async
keyword, this file is named asyncx
instead of async
Copyright:
+===================================================+
| © 2019 Privex Inc. |
| https://www.privex.io |
+===================================================+
| |
| Originally Developed by Privex Inc. |
| License: X11 / MIT |
| |
| Core Developer(s): |
| |
| (+) Chris (@someguy123) [Privex] |
| (+) Kale (@kryogenic) [Privex] |
| |
+===================================================+
Copyright 2019 Privex Inc. ( https://www.privex.io )
Functions
|
Async Synchronous Decorator, borrowed from https://stackoverflow.com/a/23036785/2648583 - added this PyDoc comment and support for returning data from a synchronous function |
|
Call, await, and/or simply return |
|
Decorator which helps with creation of async wrapper functions. |
|
Wraps a class, allowing all async methods to be used in non-async code as if they were normal synchronous methods. |
|
Async version of |
|
This function is not intended to be called directly. |
|
Detects if |
Returns |
|
|
Run the coroutine or async function |
|
Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions as if it were ran normally within an AsyncIO function. |
|
AsyncIO version of |
|
This is a wrapper function which runs |
|
Run an async function synchronously (useful for REPL testing async functions). |
Classes
|
Inheriting this class allows you to define an async __init__. |
-
privex.helpers.asyncx.
AWAITABLE_BLACKLIST
: List[str] = []¶ A list of fully qualified module paths to functions/methods, if any of these functions/methods call an
awaitable()
decorated function/method, then the awaitable will be ran synchronously regardless of whether there’s an active AsyncIO context or not.
-
privex.helpers.asyncx.
AWAITABLE_BLACKLIST_FUNCS
: List[str] = []¶ A list of plain function names - for which
awaitable()
decorated function/methods should always run synchronously.
-
privex.helpers.asyncx.
AWAITABLE_BLACKLIST_MODS
: List[str] = []¶ A list of fully qualified module names - for which
awaitable()
decorated function/methods should always run synchronously.
-
class
privex.helpers.asyncx.
aobject
(*a, **kw)[source]¶ Inheriting this class allows you to define an async __init__.
To use async constructors, you must construct your class using
await MyClass(params)
Example:
>>> class SomeClass(aobject): ... async def __init__(self, some_param='x'): ... self.some_param = some_param ... self.example = await self.test_async() ... ... async def test_async(self): ... return "hello world" ... >>> async def main(): ... some_class = await SomeClass('testing') ... print(some_class.example) ...
Note: Some IDEs like PyCharm may complain about having async
__new__
and__init__
, but it does work with Python 3.6+.You may be able to work-around the syntax error in your sub-class by defining your
__init__
method under a different name, and then assigning__init__ = _your_real_init
much like this class does.Original source: https://stackoverflow.com/a/45364670
-
privex.helpers.asyncx.
async_sync
(f)[source]¶ Async Synchronous Decorator, borrowed from https://stackoverflow.com/a/23036785/2648583 - added this PyDoc comment and support for returning data from a synchronous function
Allows a non-async function to run async functions using
yield from
- and can also return dataUseful for unit testing, since unittest.TestCase functions are synchronous.
Example async function:
>>> async def my_async_func(a, b, x=None, y=None): ... return a, b, x, y ...
Using the above async function with a non-async function:
>>> @async_sync ... def sync_function(): ... result = yield from my_async_func(1, 2, x=3, y=4) ... return result ... >>> r = sync_function() >>> print(r) (1, 2, 3, 4,) >>> print(r[1]) 2
-
async
privex.helpers.asyncx.
await_if_needed
(func: Union[callable, Coroutine, Awaitable, Any], *args, **kwargs)[source]¶ Call, await, and/or simply return
func
depending on whether it’s an async function reference (coroutine function), a non-awaited coroutine, a standard synchronous function, or just a plain old string.Helps take the guess work out of parameters which could be a string, a synchronous function, an async function, or a coroutine which hasn’t been awaited.
>>> def sync_func(hello, world=1): ... return f"sync hello: {hello} {world}" >>> async def async_func(hello, world=1): ... return f"async hello: {hello} {world}" >>> await await_if_needed(sync_func, 3, world=2) 'sync hello: 3 2' >>> await await_if_needed(async_func, 5, 4) 'async hello: 5 4' >>> f = async_func(5, 4) >>> await await_if_needed(f) 'async hello: 5 4'
- Parameters
func (callable|Coroutine|Awaitable|Any) – The function/object to await/call if needed.
args – If
func
is a function/method, will forward any positional arguments to the functionkwargs – If
func
is a function/method, will forward any keyword arguments to the function
- Return Any func_data
The result of the awaited
func
, or the originalfunc
if not a coroutine nor callable/awaitable
-
privex.helpers.asyncx.
awaitable
(func: Callable) → Callable[source]¶ Decorator which helps with creation of async wrapper functions.
Usage
Define your async function as normal, then create a standard python function using this decorator - the function should just call your async function and return it.
>>> async def some_func_async(a: str, b: str): ... c = a + b ... return c ... >>> @awaitable >>> def some_func(a, b) -> Union[str, Coroutine[Any, Any, str]]: ... return some_func_async(a, b) ...
Now, inside of async functions, we just
await
the wrapper function as if it were the original async function.>>> async def my_async_func(): ... res = await some_func("hello", "world") ...
While inside of synchronous functions, we call the wrapper function as if it were a normal synchronous function. The decorator will create an asyncio event loop, run the function, then return the result - transparent to the calling function.
>>> def my_sync_func(): ... res = some_func("hello world") ...
Blacklists
If you mix a lot of synchronous and asynchronous code,
sniffio
may return coroutines to synchronous functions that were called from asynchronous functions, which can of course cause problems.To avoid this issue, you can blacklist function names, module names (and their sub-modules), and/or fully qualified module paths to functions/methods.
Three blacklists are available in this module, which allow you to specify caller functions/methods, modules, or fully qualified module paths to functions/methods for which
awaitable()
wrapped functions/methods should always execute in an event loop and return synchronously.Example:
>>> from privex.helpers import asyncx >>> # All code within the module 'some.module' and it's sub-modules will always have awaitable's run their wrapped >>> # functions synchronously. >>> asyncx.AWAITABLE_BLACKLIST_MODS += ['some.module'] >>> # Whenever a function with the name 'example_func' (in any module) calls an awaitable, it will always run synchronously >>> asyncx.AWAITABLE_BLACKLIST_FUNCS += ['example_func'] >>> # Whenever the specific class method 'other.module.SomeClass.some_sync' calls an awaitable, it will always run synchronously. >>> asyncx.AWAITABLE_BLACKLIST += ['other.module.SomeClass.some_sync']
Original source: https://github.com/encode/httpx/issues/572#issuecomment-562179966
-
privex.helpers.asyncx.
awaitable_class
(cls: Type[T]) → Type[T][source]¶ Wraps a class, allowing all async methods to be used in non-async code as if they were normal synchronous methods.
Example Usage
Simply decorate your class with
@awaitable_class
(no brackets! takes no arguments), and once you create an instance of your class, all of your async methods can be used by synchronous code as-if they were plain functions:>>> from privex.helpers import awaitable_class >>> >>> @awaitable_class >>> class ExampleAsyncCls: >>> async def example_async(self): >>> return "hello async world" >>> >>> def example_sync(self): >>> return "hello non-async world" >>>
NOTE - You can also wrap a class without using a decorator - just pass the class as the first argument like so:
>>> class _OtherExample: ... async def hello(self): ... return 'world' >>> OtherExample = awaitable_class(_OtherExample)
If we call
.example_async()
on the above class from a synchronous REPL, it will return'hello async world'
as if it were a normal synchronous method. We can also call the non-async.example_sync()
which works like normal:>>> k = ExampleAsyncCls() >>> k.example_async() 'hello async world' >>> k.example_sync() 'hello non-async world'
However, inside of an async context (e.g. an async function),
awaitable_class
will be returning coroutines, so you shouldawait
the methods, as you would expect when dealing with an async function:>>> async def test_async(): >>> exmp = ExampleAsyncCls() >>> return await exmp.example_async() >>> >>> await test_async() 'hello async world'
- Parameters
cls (type) – The class to wrap
- Return type wrapped_class
The class after being wrapped
-
async
privex.helpers.asyncx.
call_sys_async
(proc, *args, write: Union[bytes, str] = None, **kwargs) → Tuple[bytes, bytes][source]¶ Async version of
call_sys()
- works exactly the same, other than needing to beawait
’d. Run a processproc
with the arguments*args
, optionally piping data (write
) into the process’s stdin - then returns the stdout and stderr of the process.By default,
stdout
andstdin
are set toasyncio.PIPE
while stderr defaults toasyncio.STDOUT
. You can override these by passing new values as keyword arguments.While it’s recommended to use the file descriptor types from the
asyncio
module, they’re generally just aliases to the types insubprocess
, meaningsubprocess.PIPE
should work the same asasyncio.PIPE
.Simple Example:
>>> from privex.helpers import call_sys_async, stringify >>> # All arguments are automatically quoted if required, so spaces are completely fine. >>> folders, _ = await call_sys_async('ls', '-la', '/tmp/spaces are fine/hello world') >>> print(stringify(folders)) total 0 drwxr-xr-x 3 user wheel 96 6 Dec 17:46 . drwxr-xr-x 3 user wheel 96 6 Dec 17:46 .. -rw-r--r-- 1 user wheel 0 6 Dec 17:46 example
Piping data into a process:
>>> data = "hello world" >>> # The data "hello world" will be piped into wc's stdin, and wc's stdout + stderr will be returned >>> out, _ = await call_sys_async('wc', '-c', write=data) >>> int(out) 11
- Parameters
- Key stdout
The subprocess file descriptor for stdout, e.g.
asyncio.PIPE
orasyncio.STDOUT
- Key stderr
The subprocess file descriptor for stderr, e.g.
asyncio.PIPE
orasyncio.STDOUT
- Key stdin
The subprocess file descriptor for stdin, e.g.
asyncio.PIPE
orasyncio.STDIN
- Key cwd
Set the current/working directory of the process to this path, instead of the CWD of your calling script.
- Return tuple output
A tuple containing the process output of stdout and stderr
-
privex.helpers.asyncx.
coro_thread_func
(func: callable, *t_args, _output_queue: Optional[Union[queue.Queue, str]] = None, **t_kwargs)[source]¶ This function is not intended to be called directly. It’s designed to be used as the target of a
threading.Thread
.Runs the coroutine function
func
using a new event loop for the thread this function is running within, and relays the result or an exception (if one was raised) via thequeue.Queue
_output_queue
See the higher level
run_coro_thread()
for more info.- Parameters
func (callable) – A reference to the
async def
coroutine function that you want to runt_args – Positional arguments to pass-through to the coroutine function
_output_queue –
(default:
None
) Thequeue.Queue
to emit the result or raised exception through. This can also be set toNone
to disable transmitting the result/exception via a queue.This can also be set to the string
"default"
, which means the result/exception will be transmitted via theasyncx
private queue_coro_thread_queue
t_kwargs – Keyword arguments to pass-through to the coroutine function
-
privex.helpers.asyncx.
get_async_type
(obj) → str[source]¶ Detects if
obj
is an async object/function that needs awaited / called, whether it’s a synchronous callable, or whether it’s unknown (probably not async)>>> def sync_func(hello, world=1): return f"sync hello: {hello} {world}" >>> async def async_func(hello, world=1): return f"async hello: {hello} {world}" >>> get_async_type(async_func) 'coro func' >>> get_async_type(async_func(5)) 'coro' >>> get_async_type(sync_func) 'sync func' >>> get_async_type(sync_func(10)) 'unknown'
- Parameters
obj (Any) – Object to check for async type
- Return str async_type
Either
'coro func'
,'coro'
,'awaitable'
,'sync func'
or'unknown'
-
privex.helpers.asyncx.
is_async_context
() → bool[source]¶ Returns
True
if currently in an async context, otherwiseFalse
-
privex.helpers.asyncx.
loop_run
(coro: Union[Coroutine, Type[Coroutine], Callable], *args, _loop=None, **kwargs) → Any[source]¶ Run the coroutine or async function
coro
synchronously, using an AsyncIO event loop.If the keyword argument
_loop
isn’t specified, it defaults to the loop returned byasyncio.get_event_loop()
If
coro
doesn’t appear to be a coroutine or async function:If
coro
is a normalcallable
object e.g. a function, then it’ll be called.If the object returned after calling
coro(*args, **kwargs)
is a co-routine / async func, then it’ll callloop_run
again, passing the object returned from calling it, and returning the result from that recursive call.If the returned object isn’t an async func / co-routine, then the object will be returned as-is.
Otherwise,
coro
will just be returned back to the caller.
Example Usage
First we’ll define the async function
some_func
to use as an example:>>> async def some_func(x, y): ... return x + y
Option 1 - Call an async function directly with any args/kwargs required, then pass the coroutine returned:
>>> loop_run(some_func(3, 4)) 7
Option 2 - Pass a reference to the async function, and pass any required args/kwargs straight to
loop_run()
- the function will be ran with the args/kwargs you provide, then the coroutine ran in an event loop:>>> loop_run(some_func, 10, y=20) # Opt 2. Pass the async function and include any args/kwargs for the call 30
- Parameters
coro – A co-routine, or reference to an async function to be ran synchronously
args – Any positional arguments to pass to
coro
(if it’s a function reference and not a coroutine)_loop (asyncio.base_events.BaseEventLoop) – (kwarg only!) If passed, will run
coro
in this event loop, instead ofasyncio.get_event_loop()
kwargs – Any keyword arguments to pass to
coro
(if it’s a function reference and not a coroutine)
- Return Any coro_result
The returned data from executing the coroutine / async function
-
privex.helpers.asyncx.
run_coro_thread
(func: callable, *args, **kwargs) → Any[source]¶ Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions as if it were ran normally within an AsyncIO function.
Caution
If you’re wanting to run a coroutine within a thread from an AsyncIO function/method, then you should use
run_coro_thread_async()
instead, which usesasyncio.sleep()
while waiting for a result/exception to be transmitted via a queue.This allows you to run and wait for multiple coroutine threads simultaneously, as there’s no synchronous blocking wait - unlike this function.
This will usually allow you to run coroutines from a synchronous function without running into the dreaded “Event loop is already running” error - since the coroutine will be ran inside of a thread with it’s own dedicated event loop.
Example Usage:
>>> async def example_func(lorem: int, ipsum: int): ... if lorem > 100: raise AttributeError("lorem is greater than 100!") ... return f"example: {lorem + ipsum}" >>> run_coro_thread(example_func, 10, 20) example: 30 >>> run_coro_thread(example_func, 3, ipsum=6) example: 9 >>> run_coro_thread(example_func, lorem=40, ipsum=1) example: 41 >>> run_coro_thread(example_func, 120, 50) File "", line 2, in example_func if lorem > 100: raise AttributeError("lorem is greater than 100!") AttributeError: lorem is greater than 100!
Creates a new
threading.Thread
with the targetcoro_thread_func()
(viarun_coro_thread_base()
), passing the coroutinefunc
along with the passed positionalargs
and keywordkwargs
, which creates a new event loop, and then runsfunc
within that thread event loop.Uses the private
queue.Queue
threading queue_coro_thread_queue
to safely relay back to the calling thread - either the result from the coroutine, or an exception if one was raised while trying to run the coroutine.- Parameters
func (callable) – A reference to the
async def
coroutine function that you want to runargs – Positional arguments to pass-through to the coroutine function
kwargs – Keyword arguments to pass-through to the coroutine function
- Return Any coro_res
The result returned from the coroutine
func
-
async
privex.helpers.asyncx.
run_coro_thread_async
(func: callable, *args, _queue_timeout=30.0, _queue_sleep=0.05, **kwargs) → Any[source]¶ AsyncIO version of
run_coro_thread()
which usesasyncio.sleep()
while waiting on a result from the queue, allowing you to run multiple AsyncIO coroutines which call blocking synchronous code - simultaneously, e.g. by usingasyncio.gather()
Below is an example of running an example coroutine
hello
which runs the synchronous blockingtime.sleep
. Usingrun_coro_thread_async()
plusasyncio.gather()
- we can runhello
4 times simultaneously, despite the use of the blockingtime.sleep()
.Basic usage:
>>> import asyncio >>> from privex.helpers.asyncx import run_coro_thread_async >>> async def hello(world): ... time.sleep(1) ... return world * 10 >>> await asyncio.gather(run_coro_thread_async(hello, 5), run_coro_thread_async(hello, 15), ... run_coro_thread_async(hello, 90), run_coro_thread_async(hello, 25)) [50, 150, 900, 250]
- Parameters
func (callable) – A reference to the
async def
coroutine function that you want to runargs – Positional arguments to pass-through to the coroutine function
kwargs – Keyword arguments to pass-through to the coroutine function
_queue_timeout (float|int) – (default:
30
) Maximum amount of seconds to wait for a result or exception fromfunc
before giving up._queue_sleep – (default:
0.05
) Amount of time to AsyncIO sleep between each check of the result queue
- Return Any coro_res
The result returned from the coroutine
func
-
privex.helpers.asyncx.
run_coro_thread_base
(func: callable, *args, _daemon_thread=False, **kwargs) → threading.Thread[source]¶ This is a wrapper function which runs
coro_thread_func()
within a thread, passingfunc
,args
andkwargs
to it.See the higher level
run_coro_thread()
for more info.- Parameters
func (callable) – A reference to the
async def
coroutine function that you want to runargs – Positional arguments to pass-through to the coroutine function
_daemon_thread (bool) – (Default:
False
) Must be specified as a kwarg. Controls whether or not the generatedthread.Thread
is set as a daemon thread or not.kwargs – Keyword arguments to pass-through to the coroutine function
_output_queue (queue.Queue) – A
queue.Queue
for ;func:.coro_thread_func to transmit the coroutine’s result, or any raised exceptions via.
- Return threading.Thread t_co
A started (but not joined) thread object for your caller to manage.
-
privex.helpers.asyncx.
run_sync
(func, *args, **kwargs)[source]¶ Run an async function synchronously (useful for REPL testing async functions). (TIP: Consider using
loop_run()
instead)Attention
For most cases, you should use the function
loop_run()
instead of this. Unlikerun_sync
,loop_run()
is able to handle async function references, coroutines, as well as coroutines / async functions which are wrapped in an outer non-async function (e.g. an@awaitable
wrapper).loop_run()
also supports using a custom event loop, instead of being limited toasyncio.get_event_loop()
Usage:
>>> async def my_async_func(a, b, x=None, y=None): ... return a, b, x, y >>> >>> run_sync(my_async_func, 1, 2, x=3, y=4) (1, 2, 3, 4,)
- Parameters
func (callable) – An asynchronous function to run
args – Positional arguments to pass to
func
kwargs – Keyword arguments to pass to
func