Source code for privex.helpers.cache.asyncx.base

import asyncio
from abc import ABC, abstractmethod
from typing import Any, Optional

from privex.helpers.exceptions import CacheNotFound

from privex.helpers.cache.CacheAdapter import CacheAdapter
from privex.helpers.settings import DEFAULT_CACHE_TIMEOUT
from privex.helpers.types import VAL_FUNC_CORO


[docs]class AsyncCacheAdapter(CacheAdapter, ABC): """ **AsyncCacheAdapter** is an abstract base class based on :class:`.CacheAdapter`, but with all methods designated as coroutines. Cache adapters which make use of AsyncIO, including via asyncio compatible libraries (e.g. ``aioredis``), should use this class as their parent instead of :class:`.CacheAdapter`. To retain the functionality of :meth:`.__getitem__` and :meth:`.__setitem__`, it obtains an event loop using :func:`asyncio.get_event_loop`, and then wraps :meth:`.get` or :meth:`.set` respectively using ``loop.run_until_complete`` to be able to run them within the synchronous get/setitem magic methods. It overrides :meth:`.get_or_set` to convert it into an async method, and overrides :meth:`.get_or_set_async` so that :meth:`.get` and :meth:`.set` are correctly awaited within the method. """
[docs] async def get_or_set(self, key: str, value: VAL_FUNC_CORO, timeout: int = DEFAULT_CACHE_TIMEOUT) -> Any: return await self.get_or_set_async(key=key, value=value, timeout=timeout)
[docs] async def get_or_set_async(self, key: str, value: VAL_FUNC_CORO, timeout: int = DEFAULT_CACHE_TIMEOUT) -> Any: key, timeout = str(key), int(timeout) try: k = await self.get(key, fail=True) except CacheNotFound: k = value if asyncio.iscoroutinefunction(value): k = await value(key) if asyncio.iscoroutine(value): k = await value elif callable(value): k = value(key) await self.set(key=key, value=k, timeout=timeout) return k
[docs] @abstractmethod async def get(self, key: str, default: Any = None, fail: bool = False) -> Any: raise NotImplemented(f'{self.__class__.__name__} must implement .get()')
[docs] @abstractmethod async def set(self, key: str, value: Any, timeout: Optional[int] = DEFAULT_CACHE_TIMEOUT): raise NotImplemented(f'{self.__class__.__name__} must implement .set()')
[docs] @abstractmethod async def remove(self, *key: str) -> bool: raise NotImplemented(f'{self.__class__.__name__} must implement .remove()')
[docs] @abstractmethod async def update_timeout(self, key: str, timeout: int = DEFAULT_CACHE_TIMEOUT) -> Any: raise NotImplemented(f'{self.__class__.__name__} must implement .extend_timeout()')
def __getitem__(self, item): loop = asyncio.get_event_loop() try: return loop.run_until_complete(self.get(key=item, fail=True)) except CacheNotFound: raise KeyError(f'Key "{item}" not found in cache.') def __setitem__(self, key, value): loop = asyncio.get_event_loop() return loop.run_until_complete(self.set(key=key, value=value)) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): pass def __enter__(self): loop = asyncio.get_event_loop() return loop.run_until_complete(self.__aenter__()) def __exit__(self, exc_type, exc_val, exc_tb): loop = asyncio.get_event_loop() return loop.run_until_complete(self.__aexit__(exc_type, exc_val, exc_tb))