Source code for privex.helpers.cache.asyncx.base

"""
Core classes/functions used by AsyncIO Cache Adapters, including the base class :class:`.AsyncCacheAdapter`

"""
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Optional

from privex.helpers.common import empty_if
from privex.helpers.exceptions import CacheNotFound

from privex.helpers.cache.CacheAdapter import CacheAdapter
from privex.helpers.settings import DEFAULT_CACHE_TIMEOUT
from privex.helpers.types import VAL_FUNC_CORO


[docs]class AsyncCacheAdapter(CacheAdapter, ABC): """ **AsyncCacheAdapter** is an abstract base class based on :class:`.CacheAdapter`, but with all methods designated as coroutines. Cache adapters which make use of AsyncIO, including via asyncio compatible libraries (e.g. ``aioredis``), should use this class as their parent instead of :class:`.CacheAdapter`. To retain the functionality of :meth:`.__getitem__` and :meth:`.__setitem__`, it obtains an event loop using :func:`asyncio.get_event_loop`, and then wraps :meth:`.get` or :meth:`.set` respectively using ``loop.run_until_complete`` to be able to run them within the synchronous get/setitem magic methods. It overrides :meth:`.get_or_set` to convert it into an async method, and overrides :meth:`.get_or_set_async` so that :meth:`.get` and :meth:`.set` are correctly awaited within the method. """ adapter_enter_reconnect: bool = True """ Controls whether :meth:`.__aenter__` automatically calls :meth:`.reconnect` to clear and re-create any previous connections/instances for the adapter. """ adapter_exit_close: bool = True """ Controls whether :meth:`.__aexit__` automatically calls :meth:`.close` to close any connections/instances and destroy library class instances from the current adapter instance. """ ins_enter_reconnect: bool """ Per-instance version of :attr:`.adapter_enter_reconnect`, which is set via ``enter_reconnect`` the constructor. When ``__init__`` ``enter_reconnect`` is empty, it inherits the class attribute value from :attr:`.adapter_enter_reconnect` """ ins_exit_close: bool """ Per-instance version of :attr:`.adapter_exit_close`, which is set via ``exit_close`` the constructor. When ``__init__`` ``exit_close`` is empty, it inherits the class attribute value from :attr:`.adapter_exit_close` """
[docs] def __init__(self, *args, enter_reconnect: Optional[bool] = None, exit_close: Optional[bool] = None, **kwargs): self.ins_enter_reconnect = empty_if(enter_reconnect, self.adapter_enter_reconnect) self.ins_exit_close = empty_if(exit_close, self.adapter_exit_close) super().__init__(*args, **kwargs)
[docs] async def get_or_set(self, key: str, value: VAL_FUNC_CORO, timeout: int = DEFAULT_CACHE_TIMEOUT) -> Any: return await self.get_or_set_async(key=key, value=value, timeout=timeout)
[docs] async def get_or_set_async(self, key: str, value: VAL_FUNC_CORO, timeout: int = DEFAULT_CACHE_TIMEOUT) -> Any: key, timeout = str(key), int(timeout) try: k = await self.get(key, fail=True) except CacheNotFound: k = value if asyncio.iscoroutinefunction(value): k = await value(key) if asyncio.iscoroutine(value): k = await value elif callable(value): k = value(key) await self.set(key=key, value=k, timeout=timeout) return k
[docs] @abstractmethod async def get(self, key: str, default: Any = None, fail: bool = False) -> Any: raise NotImplemented(f'{self.__class__.__name__} must implement .get()')
[docs] @abstractmethod async def set(self, key: str, value: Any, timeout: Optional[int] = DEFAULT_CACHE_TIMEOUT): raise NotImplemented(f'{self.__class__.__name__} must implement .set()')
[docs] @abstractmethod async def remove(self, *key: str) -> bool: raise NotImplemented(f'{self.__class__.__name__} must implement .remove()')
[docs] @abstractmethod async def update_timeout(self, key: str, timeout: int = DEFAULT_CACHE_TIMEOUT) -> Any: raise NotImplemented(f'{self.__class__.__name__} must implement .extend_timeout()')
[docs] async def close(self, *args, **kwargs) -> Any: """ Close any cache library connections, and destroy their local class instances by setting them to ``None``. """ return f"close() is not implemented by {self.__class__.__name__}"
[docs] async def connect(self, *args, **kwargs) -> Any: """ Create an instance of the library used to interact with the caching system, ensure it's connection is open, and store the instance on this class instance - only if not already connected. Should return the class instance which was created. """ return f"close() is not implemented by {self.__class__.__name__}"
[docs] async def reconnect(self, *args, **kwargs) -> Any: """ Calls :meth:`.close` to close any previous connections and cleanup instances, then re-create the connection(s)/instance(s) by calling :meth:`.connect` """ await self.close() return await self.connect()
def __getitem__(self, item): loop = asyncio.get_event_loop() try: return loop.run_until_complete(self.get(key=item, fail=True)) except CacheNotFound: raise KeyError(f'Key "{item}" not found in cache.') def __setitem__(self, key, value): loop = asyncio.get_event_loop() return loop.run_until_complete(self.set(key=key, value=value)) # async def __aenter__(self): # """ # Before starting a context manager, we close and cleanup any previous connection and re-create a fresh connection # and instance, ensuring no conflicts such as connections/instances attached to other AsyncIO event loops :) # """ # if self.ins_enter_reconnect: # await self.reconnect() # return self # # async def __aexit__(self, exc_type, exc_val, exc_tb): # """ # Once a context manager is finished, we close and cleanup any instances / connections, ensuring no conflicts # for code that might want to use the cache adapter in a different event loops. # """ # if self.ins_exit_close: # await self.close() def __enter__(self): loop = asyncio.get_event_loop() return loop.run_until_complete(self.__aenter__()) def __exit__(self, exc_type, exc_val, exc_tb): loop = asyncio.get_event_loop() return loop.run_until_complete(self.__aexit__(exc_type, exc_val, exc_tb))