AsyncIO Helpers / Wrappers¶
Functions and classes related to working with Python’s native asyncio support
To avoid issues with the async keyword, this file is named asyncx instead of async
Copyright:
+===================================================+
| © 2019 Privex Inc. |
| https://www.privex.io |
+===================================================+
| |
| Originally Developed by Privex Inc. |
| License: X11 / MIT |
| |
| Core Developer(s): |
| |
| (+) Chris (@someguy123) [Privex] |
| (+) Kale (@kryogenic) [Privex] |
| |
+===================================================+
Copyright 2019 Privex Inc. ( https://www.privex.io )
Functions
|
Async Synchronous Decorator, borrowed from https://stackoverflow.com/a/23036785/2648583 - added this PyDoc comment and support for returning data from a synchronous function |
|
Call, await, and/or simply return |
|
Decorator which helps with creation of async wrapper functions. |
|
Wraps a class, allowing all async methods to be used in non-async code as if they were normal synchronous methods. |
|
Async version of |
|
This function is not intended to be called directly. |
|
Detects if |
Returns |
|
|
Run the coroutine or async function |
|
Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions as if it were ran normally within an AsyncIO function. |
|
AsyncIO version of |
|
This is a wrapper function which runs |
|
Run an async function synchronously (useful for REPL testing async functions). |
Classes
|
Inheriting this class allows you to define an async __init__. |
-
privex.helpers.asyncx.AWAITABLE_BLACKLIST: List[str] = []¶ A list of fully qualified module paths to functions/methods, if any of these functions/methods call an
awaitable()decorated function/method, then the awaitable will be ran synchronously regardless of whether there’s an active AsyncIO context or not.
-
privex.helpers.asyncx.AWAITABLE_BLACKLIST_FUNCS: List[str] = []¶ A list of plain function names - for which
awaitable()decorated function/methods should always run synchronously.
-
privex.helpers.asyncx.AWAITABLE_BLACKLIST_MODS: List[str] = []¶ A list of fully qualified module names - for which
awaitable()decorated function/methods should always run synchronously.
-
class
privex.helpers.asyncx.aobject(*a, **kw)[source]¶ Inheriting this class allows you to define an async __init__.
To use async constructors, you must construct your class using
await MyClass(params)Example:
>>> class SomeClass(aobject): ... async def __init__(self, some_param='x'): ... self.some_param = some_param ... self.example = await self.test_async() ... ... async def test_async(self): ... return "hello world" ... >>> async def main(): ... some_class = await SomeClass('testing') ... print(some_class.example) ...
Note: Some IDEs like PyCharm may complain about having async
__new__and__init__, but it does work with Python 3.6+.You may be able to work-around the syntax error in your sub-class by defining your
__init__method under a different name, and then assigning__init__ = _your_real_initmuch like this class does.Original source: https://stackoverflow.com/a/45364670
-
privex.helpers.asyncx.async_sync(f)[source]¶ Async Synchronous Decorator, borrowed from https://stackoverflow.com/a/23036785/2648583 - added this PyDoc comment and support for returning data from a synchronous function
Allows a non-async function to run async functions using
yield from- and can also return dataUseful for unit testing, since unittest.TestCase functions are synchronous.
Example async function:
>>> async def my_async_func(a, b, x=None, y=None): ... return a, b, x, y ...
Using the above async function with a non-async function:
>>> @async_sync ... def sync_function(): ... result = yield from my_async_func(1, 2, x=3, y=4) ... return result ... >>> r = sync_function() >>> print(r) (1, 2, 3, 4,) >>> print(r[1]) 2
-
async
privex.helpers.asyncx.await_if_needed(func: Union[callable, Coroutine, Awaitable, Any], *args, **kwargs)[source]¶ Call, await, and/or simply return
funcdepending on whether it’s an async function reference (coroutine function), a non-awaited coroutine, a standard synchronous function, or just a plain old string.Helps take the guess work out of parameters which could be a string, a synchronous function, an async function, or a coroutine which hasn’t been awaited.
>>> def sync_func(hello, world=1): ... return f"sync hello: {hello} {world}" >>> async def async_func(hello, world=1): ... return f"async hello: {hello} {world}" >>> await await_if_needed(sync_func, 3, world=2) 'sync hello: 3 2' >>> await await_if_needed(async_func, 5, 4) 'async hello: 5 4' >>> f = async_func(5, 4) >>> await await_if_needed(f) 'async hello: 5 4'
- Parameters
func (callable|Coroutine|Awaitable|Any) – The function/object to await/call if needed.
args – If
funcis a function/method, will forward any positional arguments to the functionkwargs – If
funcis a function/method, will forward any keyword arguments to the function
- Return Any func_data
The result of the awaited
func, or the originalfuncif not a coroutine nor callable/awaitable
-
privex.helpers.asyncx.awaitable(func: Callable) → Callable[source]¶ Decorator which helps with creation of async wrapper functions.
Usage
Define your async function as normal, then create a standard python function using this decorator - the function should just call your async function and return it.
>>> async def some_func_async(a: str, b: str): ... c = a + b ... return c ... >>> @awaitable >>> def some_func(a, b) -> Union[str, Coroutine[Any, Any, str]]: ... return some_func_async(a, b) ...
Now, inside of async functions, we just
awaitthe wrapper function as if it were the original async function.>>> async def my_async_func(): ... res = await some_func("hello", "world") ...
While inside of synchronous functions, we call the wrapper function as if it were a normal synchronous function. The decorator will create an asyncio event loop, run the function, then return the result - transparent to the calling function.
>>> def my_sync_func(): ... res = some_func("hello world") ...
Blacklists
If you mix a lot of synchronous and asynchronous code,
sniffiomay return coroutines to synchronous functions that were called from asynchronous functions, which can of course cause problems.To avoid this issue, you can blacklist function names, module names (and their sub-modules), and/or fully qualified module paths to functions/methods.
Three blacklists are available in this module, which allow you to specify caller functions/methods, modules, or fully qualified module paths to functions/methods for which
awaitable()wrapped functions/methods should always execute in an event loop and return synchronously.Example:
>>> from privex.helpers import asyncx >>> # All code within the module 'some.module' and it's sub-modules will always have awaitable's run their wrapped >>> # functions synchronously. >>> asyncx.AWAITABLE_BLACKLIST_MODS += ['some.module'] >>> # Whenever a function with the name 'example_func' (in any module) calls an awaitable, it will always run synchronously >>> asyncx.AWAITABLE_BLACKLIST_FUNCS += ['example_func'] >>> # Whenever the specific class method 'other.module.SomeClass.some_sync' calls an awaitable, it will always run synchronously. >>> asyncx.AWAITABLE_BLACKLIST += ['other.module.SomeClass.some_sync']
Original source: https://github.com/encode/httpx/issues/572#issuecomment-562179966
-
privex.helpers.asyncx.awaitable_class(cls: Type[T]) → Type[T][source]¶ Wraps a class, allowing all async methods to be used in non-async code as if they were normal synchronous methods.
Example Usage
Simply decorate your class with
@awaitable_class(no brackets! takes no arguments), and once you create an instance of your class, all of your async methods can be used by synchronous code as-if they were plain functions:>>> from privex.helpers import awaitable_class >>> >>> @awaitable_class >>> class ExampleAsyncCls: >>> async def example_async(self): >>> return "hello async world" >>> >>> def example_sync(self): >>> return "hello non-async world" >>>
NOTE - You can also wrap a class without using a decorator - just pass the class as the first argument like so:
>>> class _OtherExample: ... async def hello(self): ... return 'world' >>> OtherExample = awaitable_class(_OtherExample)
If we call
.example_async()on the above class from a synchronous REPL, it will return'hello async world'as if it were a normal synchronous method. We can also call the non-async.example_sync()which works like normal:>>> k = ExampleAsyncCls() >>> k.example_async() 'hello async world' >>> k.example_sync() 'hello non-async world'
However, inside of an async context (e.g. an async function),
awaitable_classwill be returning coroutines, so you shouldawaitthe methods, as you would expect when dealing with an async function:>>> async def test_async(): >>> exmp = ExampleAsyncCls() >>> return await exmp.example_async() >>> >>> await test_async() 'hello async world'
- Parameters
cls (type) – The class to wrap
- Return type wrapped_class
The class after being wrapped
-
async
privex.helpers.asyncx.call_sys_async(proc, *args, write: Union[bytes, str] = None, **kwargs) → Tuple[bytes, bytes][source]¶ Async version of
call_sys()- works exactly the same, other than needing to beawait’d. Run a processprocwith the arguments*args, optionally piping data (write) into the process’s stdin - then returns the stdout and stderr of the process.By default,
stdoutandstdinare set toasyncio.PIPEwhile stderr defaults toasyncio.STDOUT. You can override these by passing new values as keyword arguments.While it’s recommended to use the file descriptor types from the
asynciomodule, they’re generally just aliases to the types insubprocess, meaningsubprocess.PIPEshould work the same asasyncio.PIPE.Simple Example:
>>> from privex.helpers import call_sys_async, stringify >>> # All arguments are automatically quoted if required, so spaces are completely fine. >>> folders, _ = await call_sys_async('ls', '-la', '/tmp/spaces are fine/hello world') >>> print(stringify(folders)) total 0 drwxr-xr-x 3 user wheel 96 6 Dec 17:46 . drwxr-xr-x 3 user wheel 96 6 Dec 17:46 .. -rw-r--r-- 1 user wheel 0 6 Dec 17:46 example
Piping data into a process:
>>> data = "hello world" >>> # The data "hello world" will be piped into wc's stdin, and wc's stdout + stderr will be returned >>> out, _ = await call_sys_async('wc', '-c', write=data) >>> int(out) 11
- Parameters
- Key stdout
The subprocess file descriptor for stdout, e.g.
asyncio.PIPEorasyncio.STDOUT- Key stderr
The subprocess file descriptor for stderr, e.g.
asyncio.PIPEorasyncio.STDOUT- Key stdin
The subprocess file descriptor for stdin, e.g.
asyncio.PIPEorasyncio.STDIN- Key cwd
Set the current/working directory of the process to this path, instead of the CWD of your calling script.
- Return tuple output
A tuple containing the process output of stdout and stderr
-
privex.helpers.asyncx.coro_thread_func(func: callable, *t_args, _output_queue: Optional[Union[queue.Queue, str]] = None, **t_kwargs)[source]¶ This function is not intended to be called directly. It’s designed to be used as the target of a
threading.Thread.Runs the coroutine function
funcusing a new event loop for the thread this function is running within, and relays the result or an exception (if one was raised) via thequeue.Queue_output_queueSee the higher level
run_coro_thread()for more info.- Parameters
func (callable) – A reference to the
async defcoroutine function that you want to runt_args – Positional arguments to pass-through to the coroutine function
_output_queue –
(default:
None) Thequeue.Queueto emit the result or raised exception through. This can also be set toNoneto disable transmitting the result/exception via a queue.This can also be set to the string
"default", which means the result/exception will be transmitted via theasyncxprivate queue_coro_thread_queuet_kwargs – Keyword arguments to pass-through to the coroutine function
-
privex.helpers.asyncx.get_async_type(obj) → str[source]¶ Detects if
objis an async object/function that needs awaited / called, whether it’s a synchronous callable, or whether it’s unknown (probably not async)>>> def sync_func(hello, world=1): return f"sync hello: {hello} {world}" >>> async def async_func(hello, world=1): return f"async hello: {hello} {world}" >>> get_async_type(async_func) 'coro func' >>> get_async_type(async_func(5)) 'coro' >>> get_async_type(sync_func) 'sync func' >>> get_async_type(sync_func(10)) 'unknown'
- Parameters
obj (Any) – Object to check for async type
- Return str async_type
Either
'coro func','coro','awaitable','sync func'or'unknown'
-
privex.helpers.asyncx.is_async_context() → bool[source]¶ Returns
Trueif currently in an async context, otherwiseFalse
-
privex.helpers.asyncx.loop_run(coro: Union[Coroutine, Type[Coroutine], Callable], *args, _loop=None, **kwargs) → Any[source]¶ Run the coroutine or async function
corosynchronously, using an AsyncIO event loop.If the keyword argument
_loopisn’t specified, it defaults to the loop returned byasyncio.get_event_loop()If
corodoesn’t appear to be a coroutine or async function:If
corois a normalcallableobject e.g. a function, then it’ll be called.If the object returned after calling
coro(*args, **kwargs)is a co-routine / async func, then it’ll callloop_runagain, passing the object returned from calling it, and returning the result from that recursive call.If the returned object isn’t an async func / co-routine, then the object will be returned as-is.
Otherwise,
corowill just be returned back to the caller.
Example Usage
First we’ll define the async function
some_functo use as an example:>>> async def some_func(x, y): ... return x + y
Option 1 - Call an async function directly with any args/kwargs required, then pass the coroutine returned:
>>> loop_run(some_func(3, 4)) 7
Option 2 - Pass a reference to the async function, and pass any required args/kwargs straight to
loop_run()- the function will be ran with the args/kwargs you provide, then the coroutine ran in an event loop:>>> loop_run(some_func, 10, y=20) # Opt 2. Pass the async function and include any args/kwargs for the call 30
- Parameters
coro – A co-routine, or reference to an async function to be ran synchronously
args – Any positional arguments to pass to
coro(if it’s a function reference and not a coroutine)_loop (asyncio.base_events.BaseEventLoop) – (kwarg only!) If passed, will run
coroin this event loop, instead ofasyncio.get_event_loop()kwargs – Any keyword arguments to pass to
coro(if it’s a function reference and not a coroutine)
- Return Any coro_result
The returned data from executing the coroutine / async function
-
privex.helpers.asyncx.run_coro_thread(func: callable, *args, **kwargs) → Any[source]¶ Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions as if it were ran normally within an AsyncIO function.
Caution
If you’re wanting to run a coroutine within a thread from an AsyncIO function/method, then you should use
run_coro_thread_async()instead, which usesasyncio.sleep()while waiting for a result/exception to be transmitted via a queue.This allows you to run and wait for multiple coroutine threads simultaneously, as there’s no synchronous blocking wait - unlike this function.
This will usually allow you to run coroutines from a synchronous function without running into the dreaded “Event loop is already running” error - since the coroutine will be ran inside of a thread with it’s own dedicated event loop.
Example Usage:
>>> async def example_func(lorem: int, ipsum: int): ... if lorem > 100: raise AttributeError("lorem is greater than 100!") ... return f"example: {lorem + ipsum}" >>> run_coro_thread(example_func, 10, 20) example: 30 >>> run_coro_thread(example_func, 3, ipsum=6) example: 9 >>> run_coro_thread(example_func, lorem=40, ipsum=1) example: 41 >>> run_coro_thread(example_func, 120, 50) File "", line 2, in example_func if lorem > 100: raise AttributeError("lorem is greater than 100!") AttributeError: lorem is greater than 100!
Creates a new
threading.Threadwith the targetcoro_thread_func()(viarun_coro_thread_base()), passing the coroutinefuncalong with the passed positionalargsand keywordkwargs, which creates a new event loop, and then runsfuncwithin that thread event loop.Uses the private
queue.Queuethreading queue_coro_thread_queueto safely relay back to the calling thread - either the result from the coroutine, or an exception if one was raised while trying to run the coroutine.- Parameters
func (callable) – A reference to the
async defcoroutine function that you want to runargs – Positional arguments to pass-through to the coroutine function
kwargs – Keyword arguments to pass-through to the coroutine function
- Return Any coro_res
The result returned from the coroutine
func
-
async
privex.helpers.asyncx.run_coro_thread_async(func: callable, *args, _queue_timeout=30.0, _queue_sleep=0.05, **kwargs) → Any[source]¶ AsyncIO version of
run_coro_thread()which usesasyncio.sleep()while waiting on a result from the queue, allowing you to run multiple AsyncIO coroutines which call blocking synchronous code - simultaneously, e.g. by usingasyncio.gather()Below is an example of running an example coroutine
hellowhich runs the synchronous blockingtime.sleep. Usingrun_coro_thread_async()plusasyncio.gather()- we can runhello4 times simultaneously, despite the use of the blockingtime.sleep().Basic usage:
>>> import asyncio >>> from privex.helpers.asyncx import run_coro_thread_async >>> async def hello(world): ... time.sleep(1) ... return world * 10 >>> await asyncio.gather(run_coro_thread_async(hello, 5), run_coro_thread_async(hello, 15), ... run_coro_thread_async(hello, 90), run_coro_thread_async(hello, 25)) [50, 150, 900, 250]
- Parameters
func (callable) – A reference to the
async defcoroutine function that you want to runargs – Positional arguments to pass-through to the coroutine function
kwargs – Keyword arguments to pass-through to the coroutine function
_queue_timeout (float|int) – (default:
30) Maximum amount of seconds to wait for a result or exception fromfuncbefore giving up._queue_sleep – (default:
0.05) Amount of time to AsyncIO sleep between each check of the result queue
- Return Any coro_res
The result returned from the coroutine
func
-
privex.helpers.asyncx.run_coro_thread_base(func: callable, *args, _daemon_thread=False, **kwargs) → threading.Thread[source]¶ This is a wrapper function which runs
coro_thread_func()within a thread, passingfunc,argsandkwargsto it.See the higher level
run_coro_thread()for more info.- Parameters
func (callable) – A reference to the
async defcoroutine function that you want to runargs – Positional arguments to pass-through to the coroutine function
_daemon_thread (bool) – (Default:
False) Must be specified as a kwarg. Controls whether or not the generatedthread.Threadis set as a daemon thread or not.kwargs – Keyword arguments to pass-through to the coroutine function
_output_queue (queue.Queue) – A
queue.Queuefor ;func:.coro_thread_func to transmit the coroutine’s result, or any raised exceptions via.
- Return threading.Thread t_co
A started (but not joined) thread object for your caller to manage.
-
privex.helpers.asyncx.run_sync(func, *args, **kwargs)[source]¶ Run an async function synchronously (useful for REPL testing async functions). (TIP: Consider using
loop_run()instead)Attention
For most cases, you should use the function
loop_run()instead of this. Unlikerun_sync,loop_run()is able to handle async function references, coroutines, as well as coroutines / async functions which are wrapped in an outer non-async function (e.g. an@awaitablewrapper).loop_run()also supports using a custom event loop, instead of being limited toasyncio.get_event_loop()Usage:
>>> async def my_async_func(a, b, x=None, y=None): ... return a, b, x, y >>> >>> run_sync(my_async_func, 1, 2, x=3, y=4) (1, 2, 3, 4,)
- Parameters
func (callable) – An asynchronous function to run
args – Positional arguments to pass to
funckwargs – Keyword arguments to pass to
func