AsyncIO Helpers / Wrappers

Functions and classes related to working with Python’s native asyncio support

To avoid issues with the async keyword, this file is named asyncx instead of async

Copyright:

    +===================================================+
    |                 © 2019 Privex Inc.                |
    |               https://www.privex.io               |
    +===================================================+
    |                                                   |
    |        Originally Developed by Privex Inc.        |
    |        License: X11 / MIT                         |
    |                                                   |
    |        Core Developer(s):                         |
    |                                                   |
    |          (+)  Chris (@someguy123) [Privex]        |
    |          (+)  Kale (@kryogenic) [Privex]          |
    |                                                   |
    +===================================================+

Copyright 2019     Privex Inc.   ( https://www.privex.io )

Functions

async_sync(f)

Async Synchronous Decorator, borrowed from https://stackoverflow.com/a/23036785/2648583 - added this PyDoc comment and support for returning data from a synchronous function

await_if_needed(func, *args, **kwargs)

Call, await, and/or simply return func depending on whether it’s an async function reference (coroutine function), a non-awaited coroutine, a standard synchronous function, or just a plain old string.

awaitable(func)

Decorator which helps with creation of async wrapper functions.

awaitable_class(cls)

Wraps a class, allowing all async methods to be used in non-async code as if they were normal synchronous methods.

call_sys_async(proc, *args[, write])

Async version of call_sys() - works exactly the same, other than needing to be await’d.

coro_thread_func(func, *t_args[, _output_queue])

This function is not intended to be called directly.

get_async_type(obj)

Detects if obj is an async object/function that needs awaited / called, whether it’s a synchronous callable, or whether it’s unknown (probably not async)

is_async_context()

Returns True if currently in an async context, otherwise False

loop_run(coro, *args[, _loop])

Run the coroutine or async function coro synchronously, using an AsyncIO event loop.

run_coro_thread(func, *args, **kwargs)

Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions as if it were ran normally within an AsyncIO function.

run_coro_thread_async(func, *args[, …])

AsyncIO version of run_coro_thread() which uses asyncio.sleep() while waiting on a result from the queue, allowing you to run multiple AsyncIO coroutines which call blocking synchronous code - simultaneously, e.g.

run_coro_thread_base(func, *args[, …])

This is a wrapper function which runs coro_thread_func() within a thread, passing func, args and kwargs to it.

run_sync(func, *args, **kwargs)

Run an async function synchronously (useful for REPL testing async functions).

Classes

AwaitableMixin()

aobject(*a, **kw)

Inheriting this class allows you to define an async __init__.

privex.helpers.asyncx.AWAITABLE_BLACKLIST: List[str] = []

A list of fully qualified module paths to functions/methods, if any of these functions/methods call an awaitable() decorated function/method, then the awaitable will be ran synchronously regardless of whether there’s an active AsyncIO context or not.

privex.helpers.asyncx.AWAITABLE_BLACKLIST_FUNCS: List[str] = []

A list of plain function names - for which awaitable() decorated function/methods should always run synchronously.

privex.helpers.asyncx.AWAITABLE_BLACKLIST_MODS: List[str] = []

A list of fully qualified module names - for which awaitable() decorated function/methods should always run synchronously.

class privex.helpers.asyncx.AwaitableMixin[source]
class privex.helpers.asyncx.aobject(*a, **kw)[source]

Inheriting this class allows you to define an async __init__.

To use async constructors, you must construct your class using await MyClass(params)

Example:

>>> class SomeClass(aobject):
...     async def __init__(self, some_param='x'):
...         self.some_param = some_param
...         self.example = await self.test_async()
...
...     async def test_async(self):
...         return "hello world"
...
>>> async def main():
...     some_class = await SomeClass('testing')
...     print(some_class.example)
...

Note: Some IDEs like PyCharm may complain about having async __new__ and __init__, but it does work with Python 3.6+.

You may be able to work-around the syntax error in your sub-class by defining your __init__ method under a different name, and then assigning __init__ = _your_real_init much like this class does.

Original source: https://stackoverflow.com/a/45364670

privex.helpers.asyncx.async_sync(f)[source]

Async Synchronous Decorator, borrowed from https://stackoverflow.com/a/23036785/2648583 - added this PyDoc comment and support for returning data from a synchronous function

Allows a non-async function to run async functions using yield from - and can also return data

Useful for unit testing, since unittest.TestCase functions are synchronous.

Example async function:

>>> async def my_async_func(a, b, x=None, y=None):
...     return a, b, x, y
...

Using the above async function with a non-async function:

>>> @async_sync
... def sync_function():
...     result = yield from my_async_func(1, 2, x=3, y=4)
...     return result
...
>>> r = sync_function()
>>> print(r)
(1, 2, 3, 4,)
>>> print(r[1])
2
async privex.helpers.asyncx.await_if_needed(func: Union[callable, Coroutine, Awaitable, Any], *args, **kwargs)[source]

Call, await, and/or simply return func depending on whether it’s an async function reference (coroutine function), a non-awaited coroutine, a standard synchronous function, or just a plain old string.

Helps take the guess work out of parameters which could be a string, a synchronous function, an async function, or a coroutine which hasn’t been awaited.

>>> def sync_func(hello, world=1):
...     return f"sync hello: {hello} {world}"
>>> async def async_func(hello, world=1):
...     return f"async hello: {hello} {world}"
>>> await await_if_needed(sync_func, 3, world=2)
'sync hello: 3 2'
>>> await await_if_needed(async_func, 5, 4)
'async hello: 5 4'
>>> f = async_func(5, 4)
>>> await await_if_needed(f)
'async hello: 5 4'
Parameters
  • func (callable|Coroutine|Awaitable|Any) – The function/object to await/call if needed.

  • args – If func is a function/method, will forward any positional arguments to the function

  • kwargs – If func is a function/method, will forward any keyword arguments to the function

Return Any func_data

The result of the awaited func, or the original func if not a coroutine nor callable/awaitable

privex.helpers.asyncx.awaitable(func: Callable) → Callable[source]

Decorator which helps with creation of async wrapper functions.

Usage

Define your async function as normal, then create a standard python function using this decorator - the function should just call your async function and return it.

>>> async def some_func_async(a: str, b: str):
...     c = a + b
...     return c
...
>>> @awaitable
>>> def some_func(a, b) -> Union[str, Coroutine[Any, Any, str]]:
...     return some_func_async(a, b)
...

Now, inside of async functions, we just await the wrapper function as if it were the original async function.

>>> async def my_async_func():
...     res = await some_func("hello", "world")
...

While inside of synchronous functions, we call the wrapper function as if it were a normal synchronous function. The decorator will create an asyncio event loop, run the function, then return the result - transparent to the calling function.

>>> def my_sync_func():
...     res = some_func("hello world")
...

Blacklists

If you mix a lot of synchronous and asynchronous code, sniffio may return coroutines to synchronous functions that were called from asynchronous functions, which can of course cause problems.

To avoid this issue, you can blacklist function names, module names (and their sub-modules), and/or fully qualified module paths to functions/methods.

Three blacklists are available in this module, which allow you to specify caller functions/methods, modules, or fully qualified module paths to functions/methods for which awaitable() wrapped functions/methods should always execute in an event loop and return synchronously.

Example:

>>> from privex.helpers import asyncx
>>> # All code within the module 'some.module' and it's sub-modules will always have awaitable's run their wrapped
>>> # functions synchronously.
>>> asyncx.AWAITABLE_BLACKLIST_MODS += ['some.module']
>>> # Whenever a function with the name 'example_func' (in any module) calls an awaitable, it will always run synchronously
>>> asyncx.AWAITABLE_BLACKLIST_FUNCS += ['example_func']
>>> # Whenever the specific class method 'other.module.SomeClass.some_sync' calls an awaitable, it will always run synchronously.
>>> asyncx.AWAITABLE_BLACKLIST += ['other.module.SomeClass.some_sync']

Original source: https://github.com/encode/httpx/issues/572#issuecomment-562179966

privex.helpers.asyncx.awaitable_class(cls: Type[T]) → Type[T][source]

Wraps a class, allowing all async methods to be used in non-async code as if they were normal synchronous methods.

Example Usage

Simply decorate your class with @awaitable_class (no brackets! takes no arguments), and once you create an instance of your class, all of your async methods can be used by synchronous code as-if they were plain functions:

>>> from privex.helpers import awaitable_class
>>>
>>> @awaitable_class
>>> class ExampleAsyncCls:
>>>     async def example_async(self):
>>>         return "hello async world"
>>>
>>>     def example_sync(self):
>>>         return "hello non-async world"
>>>

NOTE - You can also wrap a class without using a decorator - just pass the class as the first argument like so:

>>> class _OtherExample:
...     async def hello(self):
...         return 'world'
>>> OtherExample = awaitable_class(_OtherExample)

If we call .example_async() on the above class from a synchronous REPL, it will return 'hello async world' as if it were a normal synchronous method. We can also call the non-async .example_sync() which works like normal:

>>> k = ExampleAsyncCls()
>>> k.example_async()
'hello async world'
>>> k.example_sync()
'hello non-async world'

However, inside of an async context (e.g. an async function), awaitable_class will be returning coroutines, so you should await the methods, as you would expect when dealing with an async function:

>>> async def test_async():
>>>     exmp = ExampleAsyncCls()
>>>     return await exmp.example_async()
>>>
>>> await test_async()
'hello async world'
Parameters

cls (type) – The class to wrap

Return type wrapped_class

The class after being wrapped

async privex.helpers.asyncx.call_sys_async(proc, *args, write: Union[bytes, str] = None, **kwargs) → Tuple[bytes, bytes][source]

Async version of call_sys() - works exactly the same, other than needing to be await’d. Run a process proc with the arguments *args, optionally piping data (write) into the process’s stdin - then returns the stdout and stderr of the process.

By default, stdout and stdin are set to asyncio.PIPE while stderr defaults to asyncio.STDOUT. You can override these by passing new values as keyword arguments.

While it’s recommended to use the file descriptor types from the asyncio module, they’re generally just aliases to the types in subprocess, meaning subprocess.PIPE should work the same as asyncio.PIPE.

Simple Example:

>>> from privex.helpers import call_sys_async, stringify
>>> # All arguments are automatically quoted if required, so spaces are completely fine.
>>> folders, _ = await call_sys_async('ls', '-la', '/tmp/spaces are fine/hello world')
>>> print(stringify(folders))
total 0
drwxr-xr-x  3 user  wheel  96  6 Dec 17:46 .
drwxr-xr-x  3 user  wheel  96  6 Dec 17:46 ..
-rw-r--r--  1 user  wheel   0  6 Dec 17:46 example

Piping data into a process:

>>> data = "hello world"
>>> # The data "hello world" will be piped into wc's stdin, and wc's stdout + stderr will be returned
>>> out, _ = await call_sys_async('wc', '-c', write=data)
>>> int(out)
11
Parameters
  • proc (str) – The process to execute.

  • args (str) – Any arguments to pass to the process proc as positional arguments.

  • write (bytes|str) – If this is not None, then this data will be piped into the process’s STDIN.

Key stdout

The subprocess file descriptor for stdout, e.g. asyncio.PIPE or asyncio.STDOUT

Key stderr

The subprocess file descriptor for stderr, e.g. asyncio.PIPE or asyncio.STDOUT

Key stdin

The subprocess file descriptor for stdin, e.g. asyncio.PIPE or asyncio.STDIN

Key cwd

Set the current/working directory of the process to this path, instead of the CWD of your calling script.

Return tuple output

A tuple containing the process output of stdout and stderr

privex.helpers.asyncx.coro_thread_func(func: callable, *t_args, _output_queue: Optional[Union[queue.Queue, str]] = None, **t_kwargs)[source]

This function is not intended to be called directly. It’s designed to be used as the target of a threading.Thread.

Runs the coroutine function func using a new event loop for the thread this function is running within, and relays the result or an exception (if one was raised) via the queue.Queue _output_queue

See the higher level run_coro_thread() for more info.

Parameters
  • func (callable) – A reference to the async def coroutine function that you want to run

  • t_args – Positional arguments to pass-through to the coroutine function

  • _output_queue

    (default: None) The queue.Queue to emit the result or raised exception through. This can also be set to None to disable transmitting the result/exception via a queue.

    This can also be set to the string "default", which means the result/exception will be transmitted via the asyncx private queue _coro_thread_queue

  • t_kwargs – Keyword arguments to pass-through to the coroutine function

privex.helpers.asyncx.get_async_type(obj)str[source]

Detects if obj is an async object/function that needs awaited / called, whether it’s a synchronous callable, or whether it’s unknown (probably not async)

>>> def sync_func(hello, world=1): return f"sync hello: {hello} {world}"
>>> async def async_func(hello, world=1): return f"async hello: {hello} {world}"
>>> get_async_type(async_func)
'coro func'
>>> get_async_type(async_func(5))
'coro'
>>> get_async_type(sync_func)
'sync func'
>>> get_async_type(sync_func(10))
'unknown'
Parameters

obj (Any) – Object to check for async type

Return str async_type

Either 'coro func', 'coro', 'awaitable', 'sync func' or 'unknown'

privex.helpers.asyncx.is_async_context()bool[source]

Returns True if currently in an async context, otherwise False

privex.helpers.asyncx.loop_run(coro: Union[Coroutine, Type[Coroutine], Callable], *args, _loop=None, **kwargs) → Any[source]

Run the coroutine or async function coro synchronously, using an AsyncIO event loop.

If the keyword argument _loop isn’t specified, it defaults to the loop returned by asyncio.get_event_loop()

If coro doesn’t appear to be a coroutine or async function:

  • If coro is a normal callable object e.g. a function, then it’ll be called.

    • If the object returned after calling coro(*args, **kwargs) is a co-routine / async func, then it’ll call loop_run again, passing the object returned from calling it, and returning the result from that recursive call.

    • If the returned object isn’t an async func / co-routine, then the object will be returned as-is.

  • Otherwise, coro will just be returned back to the caller.

Example Usage

First we’ll define the async function some_func to use as an example:

>>> async def some_func(x, y):
...     return x + y

Option 1 - Call an async function directly with any args/kwargs required, then pass the coroutine returned:

>>> loop_run(some_func(3, 4))
7

Option 2 - Pass a reference to the async function, and pass any required args/kwargs straight to loop_run() - the function will be ran with the args/kwargs you provide, then the coroutine ran in an event loop:

>>> loop_run(some_func, 10, y=20)    # Opt 2. Pass the async function and include any args/kwargs for the call
30
Parameters
  • coro – A co-routine, or reference to an async function to be ran synchronously

  • args – Any positional arguments to pass to coro (if it’s a function reference and not a coroutine)

  • _loop (asyncio.base_events.BaseEventLoop) – (kwarg only!) If passed, will run coro in this event loop, instead of asyncio.get_event_loop()

  • kwargs – Any keyword arguments to pass to coro (if it’s a function reference and not a coroutine)

Return Any coro_result

The returned data from executing the coroutine / async function

privex.helpers.asyncx.run_coro_thread(func: callable, *args, **kwargs) → Any[source]

Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions as if it were ran normally within an AsyncIO function.

Caution

If you’re wanting to run a coroutine within a thread from an AsyncIO function/method, then you should use run_coro_thread_async() instead, which uses asyncio.sleep() while waiting for a result/exception to be transmitted via a queue.

This allows you to run and wait for multiple coroutine threads simultaneously, as there’s no synchronous blocking wait - unlike this function.

This will usually allow you to run coroutines from a synchronous function without running into the dreaded “Event loop is already running” error - since the coroutine will be ran inside of a thread with it’s own dedicated event loop.

Example Usage:

>>> async def example_func(lorem: int, ipsum: int):
...     if lorem > 100: raise AttributeError("lorem is greater than 100!")
...     return f"example: {lorem + ipsum}"
>>> run_coro_thread(example_func, 10, 20)
example: 30
>>> run_coro_thread(example_func, 3, ipsum=6)
example: 9
>>> run_coro_thread(example_func, lorem=40, ipsum=1)
example: 41
>>> run_coro_thread(example_func, 120, 50)
File "", line 2, in example_func
    if lorem > 100: raise AttributeError("lorem is greater than 100!")
AttributeError: lorem is greater than 100!

Creates a new threading.Thread with the target coro_thread_func() (via run_coro_thread_base()), passing the coroutine func along with the passed positional args and keyword kwargs, which creates a new event loop, and then runs func within that thread event loop.

Uses the private queue.Queue threading queue _coro_thread_queue to safely relay back to the calling thread - either the result from the coroutine, or an exception if one was raised while trying to run the coroutine.

Parameters
  • func (callable) – A reference to the async def coroutine function that you want to run

  • args – Positional arguments to pass-through to the coroutine function

  • kwargs – Keyword arguments to pass-through to the coroutine function

Return Any coro_res

The result returned from the coroutine func

async privex.helpers.asyncx.run_coro_thread_async(func: callable, *args, _queue_timeout=30.0, _queue_sleep=0.05, **kwargs) → Any[source]

AsyncIO version of run_coro_thread() which uses asyncio.sleep() while waiting on a result from the queue, allowing you to run multiple AsyncIO coroutines which call blocking synchronous code - simultaneously, e.g. by using asyncio.gather()

Below is an example of running an example coroutine hello which runs the synchronous blocking time.sleep. Using run_coro_thread_async() plus asyncio.gather() - we can run hello 4 times simultaneously, despite the use of the blocking time.sleep().

Basic usage:

>>> import asyncio
>>> from privex.helpers.asyncx import run_coro_thread_async
>>> async def hello(world):
...     time.sleep(1)
...     return world * 10
>>> await asyncio.gather(run_coro_thread_async(hello, 5), run_coro_thread_async(hello, 15),
...                      run_coro_thread_async(hello, 90), run_coro_thread_async(hello, 25))
[50, 150, 900, 250]
Parameters
  • func (callable) – A reference to the async def coroutine function that you want to run

  • args – Positional arguments to pass-through to the coroutine function

  • kwargs – Keyword arguments to pass-through to the coroutine function

  • _queue_timeout (float|int) – (default: 30) Maximum amount of seconds to wait for a result or exception from func before giving up.

  • _queue_sleep – (default: 0.05) Amount of time to AsyncIO sleep between each check of the result queue

Return Any coro_res

The result returned from the coroutine func

privex.helpers.asyncx.run_coro_thread_base(func: callable, *args, _daemon_thread=False, **kwargs)threading.Thread[source]

This is a wrapper function which runs coro_thread_func() within a thread, passing func, args and kwargs to it.

See the higher level run_coro_thread() for more info.

Parameters
  • func (callable) – A reference to the async def coroutine function that you want to run

  • args – Positional arguments to pass-through to the coroutine function

  • _daemon_thread (bool) – (Default: False) Must be specified as a kwarg. Controls whether or not the generated thread.Thread is set as a daemon thread or not.

  • kwargs – Keyword arguments to pass-through to the coroutine function

  • _output_queue (queue.Queue) – A queue.Queue for ;func:.coro_thread_func to transmit the coroutine’s result, or any raised exceptions via.

Return threading.Thread t_co

A started (but not joined) thread object for your caller to manage.

privex.helpers.asyncx.run_sync(func, *args, **kwargs)[source]

Run an async function synchronously (useful for REPL testing async functions). (TIP: Consider using loop_run() instead)

Attention

For most cases, you should use the function loop_run() instead of this. Unlike run_sync, loop_run() is able to handle async function references, coroutines, as well as coroutines / async functions which are wrapped in an outer non-async function (e.g. an @awaitable wrapper).

loop_run() also supports using a custom event loop, instead of being limited to asyncio.get_event_loop()

Usage:

>>> async def my_async_func(a, b, x=None, y=None):
...     return a, b, x, y
>>>
>>> run_sync(my_async_func, 1, 2, x=3, y=4)
(1, 2, 3, 4,)
Parameters
  • func (callable) – An asynchronous function to run

  • args – Positional arguments to pass to func

  • kwargs – Keyword arguments to pass to func