Source code for privex.helpers.plugin

"""
This module handles connection objects for databases, APIs etc. by exposing functions which initialise and
store class instances for re-use.

It's primarily intended to be used to enable database, caching and third-party API connectivity for the helpers in this
package, however, you're free to use the functions / classes / attributes exposed in this module for your own apps.

Classes are generally initialised using the settings from :py:mod:`.settings` - see the docs for that module to learn
how to override the settings if the defaults don't work for you.


**Copyright**::

        +===================================================+
        |                 © 2019 Privex Inc.                |
        |               https://www.privex.io               |
        +===================================================+
        |                                                   |
        |        Originally Developed by Privex Inc.        |
        |        License: X11 / MIT                         |
        |                                                   |
        |        Core Developer(s):                         |
        |                                                   |
        |          (+)  Chris (@someguy123) [Privex]        |
        |          (+)  Kale (@kryogenic) [Privex]          |
        |                                                   |
        +===================================================+

    Copyright 2019     Privex Inc.   ( https://www.privex.io )


"""
import logging
import threading
from os import path
from typing import Any, Union, Optional

from privex.helpers.collections import DictObject

from privex.helpers import settings
from privex.helpers.types import T
from privex.helpers.common import empty_if
from privex.helpers.exceptions import GeoIPDatabaseNotFound

log = logging.getLogger(__name__)

__all__ = [
    'HAS_REDIS', 'HAS_ASYNC_REDIS', 'HAS_DNSPYTHON', 'HAS_CRYPTO', 'HAS_SETUPPY_BUMP',
    'HAS_SETUPPY_COMMANDS', 'HAS_SETUPPY_COMMON'
]

HAS_REDIS = False
"""If the ``redis`` module was imported successfully, this will change to True."""

HAS_ASYNC_REDIS = False
"""If the ``aioredis`` module was imported successfully, this will change to True."""

HAS_DNSPYTHON = False
"""If the ``dns.resolver`` module was imported successfully, this will change to True."""

HAS_CRYPTO = False
"""If :py:mod:`privex.helpers.crypto` was imported successfully, this will change to True"""

HAS_SETUPPY_COMMON = False
"""If :py:mod:`privex.helpers.setuppy.common` was imported successfully, this will change to True"""

HAS_SETUPPY_BUMP = False
"""If :py:mod:`privex.helpers.setuppy.bump` was imported successfully, this will change to True"""

HAS_SETUPPY_COMMANDS = False
"""If :py:mod:`privex.helpers.setuppy.commands` was imported successfully, this will change to True"""

HAS_GEOIP = False
"""If :py:mod:`privex.helpers.geoip` was imported successfully, this will change to True"""

__STORE = dict(threads={})
"""This ``dict`` is used to store initialised classes for connections to databases, APIs etc."""


def _get_threadstore(name=None, fallback=None, thread_id=None) -> Any:
    thread_id = empty_if(thread_id, threading.get_ident())
    if thread_id not in __STORE['threads']:
        __STORE['threads'][thread_id] = {}
    thread_store: dict = __STORE['threads'][thread_id]
    if name is None:
        return thread_store
    
    return thread_store.get(name, fallback)


def _set_threadstore(name, obj: T, thread_id=None) -> T:
    thread_store = _get_threadstore(thread_id=thread_id)
    thread_store[name] = obj
    return obj


[docs]def clean_threadstore(thread_id=None, name=None): """ Remove the per-thread instance storage in :attr:`.__STORE`, usually called when a thread is exiting. Example:: >>> def some_thread(): ... r = get_redis() ... print('doing something') ... print('cleaning up...') ... clean_threadstore() # With no arguments, it cleans the thread store for the thread that called it. >>> t = threading.Thread(target=some_thread) >>> t.start() >>> t.join() Usage outside of a thread:: >>> t = threading.Thread(target=some_thread) >>> t.start() >>> thread_id = t.ident # Get the thread ID for the started thread >>> t.join() # Wait for the thread to finish >>> if thread_id is not None: # Make sure the thread ID isn't None ... clean_threadstore(thread_id) # Cleanup any leftover instances, if there are any. ... Removing an individual item from thread store:: >>> def some_thread(): ... r = get_redis() ... print('doing something') ... print('cleaning up...') ... clean_threadstore(name='redis') # Delete only the key 'redis' from the thread store :param thread_id: The ID of the thread (usually from :func:`threading.get_ident`) to clean the storage for. If left as None, will use the ID returned by :func:`threading.get_ident`. :param name: If specified, then only the key ``name`` will be deleted from the thread store, instead of the entire thread store. """ thread_id = empty_if(thread_id, threading.get_ident()) if thread_id in __STORE['threads']: ts = __STORE['threads'][thread_id] if name is None: del __STORE['threads'][thread_id] return True elif name in ts: del ts[name] return True return False
try: import redis
[docs] def connect_redis(*args, **rd_config): from privex.helpers.common import extract_settings # return redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB) return redis.Redis(*args, **extract_settings('REDIS_', settings, merge_conf=dict(rd_config)))
[docs] def get_redis(new_connection=False, thread_id=None, **rd_config) -> redis.Redis: """Get a Redis connection object. Create one if it doesn't exist.""" from privex.helpers.common import empty_if, extract_settings if new_connection: return connect_redis(**rd_config) # if thread_id not in __STORE['threads']: __STORE['threads'][thread_id] = {} # thread_store = __STORE['threads'][thread_id] rd = _get_threadstore('redis', thread_id=thread_id) if rd is None: rd = connect_redis(**rd_config) _set_threadstore('redis', rd, thread_id=thread_id) return rd
# if 'redis' in thread_store and not new_connection: return thread_store['redis'] # if rd is not None and not new_connection: # # # if 'redis' not in thread_store: # if new_connection: # return rd # thread_store['redis'] = rd # return thread_store['redis']
[docs] def close_redis(thread_id=None) -> bool: """ Close the global Redis connection and delete the instance. :param thread_id: Close and delete the Redis instance for this thread ID, instead of the detected current thread :return bool deleted: ``True`` if an instance was found and deleted. ``False`` if there was no existing Redis instance. """ rd: Union[str, redis.Redis] = _get_threadstore('redis', 'NOT_FOUND', thread_id=thread_id) if rd != 'NOT_FOUND': rd.close() clean_threadstore(thread_id=thread_id, name='redis') return True return False
[docs] def reset_redis(thread_id=None) -> redis.Redis: """Close the global Redis connection, delete the instance, then re-instantiate it""" close_redis(thread_id=thread_id) return _set_threadstore('redis', get_redis(True, thread_id), thread_id=thread_id)
[docs] def configure_redis(host=settings.REDIS_HOST, port: int = settings.REDIS_PORT, db: int = settings.REDIS_DB, **kwargs): """Update global Redis settings and re-instantiate the global Redis instance with the new settings.""" thread_id = kwargs.get('thread_id', None) settings.REDIS_DB = db settings.REDIS_PORT = port settings.REDIS_HOST = host return reset_redis(thread_id=thread_id)
__all__ += ['get_redis', 'reset_redis', 'configure_redis', 'close_redis', 'connect_redis'] HAS_REDIS = True except ImportError: log.debug('privex.helpers __init__ failed to import "redis", Redis dependent helpers will be disabled.') except Exception as e: log.debug('privex.helpers __init__ failed to import "redis", (unknown exception), ' 'Redis dependent helpers will be disabled. Exception: %s %s', type(e), str(e)) try: import aioredis HAS_ASYNC_REDIS = True
[docs] async def connect_redis_async(**rd_config) -> aioredis.Redis: from privex.helpers.common import extract_settings cf = extract_settings('REDIS_', settings, merge_conf=dict(rd_config)) _addr = f"redis://{cf.pop('host', 'localhost')}:{cf.pop('port', 6379)}" return await aioredis.create_redis_pool( address=_addr, **cf )
[docs] async def get_redis_async(new_connection=False, thread_id=None, **rd_config) -> aioredis.connection: """ Get an Async Redis connection object. Create one if it doesn't exist. Usage:: >>> redis_conn = await get_redis_async() >>> redis = await redis_conn >>> await redis.set('some_key', 'example') >>> await redis.get('some_key') 'example' """ from privex.helpers.common import empty_if, extract_settings if new_connection: return await connect_redis_async(**rd_config) rd: Optional[aioredis.Redis] = _get_threadstore('aioredis', thread_id=thread_id) if rd is None: rd: aioredis.Redis = _set_threadstore('aioredis', await connect_redis_async(**rd_config), thread_id=thread_id) return rd
[docs] async def close_redis_async(thread_id=None) -> bool: """ Close the global Async Redis connection and delete the instance. :param thread_id: Close and delete the Redis instance for this thread ID, instead of the detected current thread :return bool deleted: ``True`` if an instance was found and deleted. ``False`` if there was no existing Redis instance. """ rd: Union[str, aioredis.connection] = _get_threadstore('aioredis', 'NOT_FOUND', thread_id=thread_id) if rd != 'NOT_FOUND': rd.close() await rd.wait_close() clean_threadstore(thread_id=thread_id, name='aioredis') return True return False
[docs] async def reset_redis_async(thread_id=None) -> aioredis.connection: """Close the global Async Redis connection, delete the instance, then re-instantiate it""" await close_redis_async(thread_id=thread_id) return await _set_threadstore('aioredis', get_redis_async(True, thread_id), thread_id=thread_id)
[docs] async def configure_redis_async(host=settings.REDIS_HOST, port: int = settings.REDIS_PORT, db: int = settings.REDIS_DB, **kwargs): """Update global Redis settings and re-instantiate the global Async Redis instance with the new settings.""" thread_id = kwargs.get('thread_id', None) settings.REDIS_DB = db settings.REDIS_PORT = port settings.REDIS_HOST = host return await reset_redis_async(thread_id=thread_id)
__all__ += ['get_redis_async', 'reset_redis_async', 'configure_redis_async', 'close_redis_async', 'connect_redis_async'] except ImportError: log.debug('privex.helpers __init__ failed to import "aioredis", Async Redis dependent helpers will be disabled.') except Exception as e: log.debug('privex.helpers __init__ failed to import "aioredis", (unknown exception), ' 'Async Redis dependent helpers will be disabled. Exception: %s %s', type(e), str(e)) try: import aiomcache HAS_ASYNC_MEMCACHED = True
[docs] async def connect_memcached_async(**rd_config) -> aiomcache.Client: from privex.helpers.common import extract_settings cf = extract_settings('MEMCACHED_', settings, merge_conf=dict(rd_config)) # if 'loop' not in cf: # cf['loop'] = asyncio.get_event_loop() return aiomcache.Client(**cf)
[docs] async def get_memcached_async(new_connection=False, thread_id=None, **rd_config) -> aiomcache.Client: """Get an Async Memcached connection object. Create one if it doesn't exist.""" from privex.helpers.common import empty_if, extract_settings if new_connection: return await connect_memcached_async(**rd_config) rd: Optional[aiomcache.Client] = _get_threadstore('aiomemcached', thread_id=thread_id) if rd is None: rd: aiomcache.Client = _set_threadstore('aiomemcached', await connect_memcached_async(**rd_config), thread_id=thread_id) return rd
[docs] def close_memcached_async(thread_id=None) -> bool: """ Close the global Async Memcached connection and delete the instance. :param thread_id: Close and delete the Memcached instance for this thread ID, instead of the detected current thread :return bool deleted: ``True`` if an instance was found and deleted. ``False`` if there was no existing Memcached instance. """ rd: Union[str, aiomcache.Client] = _get_threadstore('aiomemcached', 'NOT_FOUND', thread_id=thread_id) if rd != 'NOT_FOUND': rd.close() clean_threadstore(thread_id=thread_id, name='aiomemcached') return True return False
[docs] async def reset_memcached_async(thread_id=None) -> aiomcache.Client: """Close the global Async Memcached connection, delete the instance, then re-instantiate it""" close_memcached_async(thread_id=thread_id) return await _set_threadstore('aiomemcached', get_memcached_async(True, thread_id), thread_id=thread_id)
[docs] async def configure_memcached_async(host=settings.MEMCACHED_HOST, port: int = settings.MEMCACHED_PORT, **kwargs): """Update global Memcached settings and re-instantiate the global Async Redis instance with the new settings.""" thread_id = kwargs.get('thread_id', None) settings.MEMCACHED_PORT = port settings.MEMCACHED_HOST = host return await reset_memcached_async(thread_id=thread_id)
__all__ += ['get_memcached_async', 'reset_memcached_async', 'configure_memcached_async', 'close_memcached_async', 'connect_memcached_async'] except ImportError: log.debug('%s failed to import "aiomcache", Async Memcached dependent helpers will be disabled.', __name__) except Exception as e: log.debug('%s failed to import "aiomcache", (unknown exception), Async Memcached dependent helpers ' 'will be disabled. Exception: %s %s', __name__, type(e), str(e)) try: import geoip2 import geoip2.database
[docs] def connect_geoip(*args, **geo_config) -> geoip2.database.Reader: return geoip2.database.Reader(*args, **geo_config)
[docs] def get_geodbs() -> DictObject: return DictObject({ 'city': DictObject( name=settings.GEOCITY_NAME, path=settings.GEOCITY, detected=settings.GEOCITY_DETECTED ), 'country': DictObject( name=settings.GEOCOUNTRY_NAME, path=settings.GEOCOUNTRY, detected=settings.GEOCOUNTRY_DETECTED ), 'asn': DictObject( name=settings.GEOASN_NAME, path=settings.GEOASN, detected=settings.GEOASN_DETECTED ), })
def _find_geoip(geo_type: str) -> Optional[str]: _geo_dbs = get_geodbs() if geo_type not in _geo_dbs: raise AttributeError("_find_geoip: geo_type must be 'city', 'asn' or 'country'") log.debug("Locating GeoIP %s - _geo_dbs = %s", geo_type.capitalize(), _geo_dbs) for p in settings.search_geoip: g_p = path.join(p, _geo_dbs[geo_type].name) log.debug("Checking if %s exists...", g_p) if path.exists(g_p): log.debug("Found GeoIP %s (%s) in path: %s", geo_type, _geo_dbs[geo_type], g_p) return g_p log.warning("Failed to find GeoIP %s in search paths %s", geo_type.capitalize(), settings.search_geoip) return None
[docs] def get_geoip_db(geo_type: str) -> str: """ Return the full path to the GeoIP2 database for ``geo_type``. If we haven't yet scanned the search paths for the database, then :func:`._find_geoip` will be called to try and locate the database file. If the database is found, the ``_DETECTED`` boolean setting will be changed to ``True`` so we know that the path contained in the :func:`.get_geodbs` result is valid in the future, avoiding unnecessary searches. If the database can't be found anywhere within the search paths, :class:`.GeoIPDatabaseNotFound` will be raised. :param str geo_type: The GeoIP database type: either 'city', 'asn' or 'country' :raises GeoIPDatabaseNotFound: If the database for ``geo_type`` could not be found. :return str path: The full path to the detected GeoIP database """ _geo_dbs = get_geodbs() if geo_type not in _geo_dbs: raise AttributeError("get_geoip_db: geo_type must be 'city', 'asn' or 'country'") gdb = _geo_dbs[geo_type] if gdb.detected: return gdb.path gp = _find_geoip(geo_type) if not gp: log.error("Failed to locate GeoIP ASN + City in alternative search folders.") log.error(f" [!!!] ERROR - Missing GeoIP files. The following files do not exist:\n") log.error(f"\t{gdb.path}") log.error(f"\nCurrent environment settings for GeoIP location:\n") log.error(f"GEOIP_DIR={settings.GEOIP_DIR}") log.error(f"GEOASN_NAME={settings.GEOASN_NAME}") log.error(f"GEOCITY_NAME={settings.GEOCITY_NAME}\n") log.error(f"GEOCOUNTRY_NAME={settings.GEOCOUNTRY_NAME}\n") log.error("Please download the GeoLite2 City and ASN GeoIP databases (Maxmind Binary Database format) " "for FREE from the following URL:\n") log.error("\thttps://dev.maxmind.com/geoip/geoip2/geolite2/\n") log.error("It's recommended to place the .mmdb files in the default folder '/usr/share/GeoIP/' - " "as most applications which use Maxmind GeoIP2 will use that folder by default.\n") log.error("You can alternatively place the .mmdb files into any of these folders:\n") for p in settings.search_geoip: log.error("\t - %s", p) log.error("") log.error("For most functionality, you'll need both GeoIP2 City + GeoIP2 ASN. The GeoIP2 City database generally provides " "everything in the Country database, plus City data - making the Country database generally redundant.\n") raise GeoIPDatabaseNotFound(f"Failed to locate GeoIP {geo_type.capitalize()} ({gdb.name})") if geo_type == 'city': settings.GEOCITY, settings.GEOCITY_DETECTED = gp, True if geo_type == 'country': settings.GEOCOUNTRY, settings.GEOCOUNTRY_DETECTED = gp, True if geo_type == 'asn': settings.GEOASN, settings.GEOASN_DETECTED = gp, True return gp
[docs] def get_geoip(geo_type: str, new_connection=False, thread_id=None, **geo_config) -> geoip2.database.Reader: """Get a GeoIP Reader object. Create one if it doesn't exist.""" from privex.helpers.common import empty_if, extract_settings gdb = get_geoip_db(geo_type) _geo_dbs = get_geodbs() if geo_type not in _geo_dbs: raise AttributeError("get_geoip: geo_type must be 'city', 'asn' or 'country'") if new_connection: return connect_geoip(gdb, **geo_config) tsname = f'geoip_{geo_type}' geo = _get_threadstore(tsname, thread_id=thread_id) if geo is None: geo = connect_geoip(gdb, **geo_config) _set_threadstore(tsname, geo, thread_id=thread_id) return geo
[docs] def close_geoip(geo_type: str, thread_id=None) -> bool: """ Close the global GeoIP connection and delete the instance. :param str geo_type: The GeoIP database type: either 'city', 'asn' or 'country' :param thread_id: Close and delete the Redis instance for this thread ID, instead of the detected current thread :return bool deleted: ``True`` if an instance was found and deleted. ``False`` if there was no existing Redis instance. """ tsname = f'geoip_{geo_type}' geo: Union[str, geoip2.database.Reader] = _get_threadstore(tsname, 'NOT_FOUND', thread_id=thread_id) if geo != 'NOT_FOUND': geo.close() clean_threadstore(thread_id=thread_id, name=tsname) return True return False
[docs] def reset_geoip(geo_type: str, thread_id=None) -> geoip2.database.Reader: """Close the global GeoIP connection, delete the instance, then re-instantiate it""" close_geoip(geo_type, thread_id=thread_id) return get_geoip(geo_type, thread_id=thread_id)
__all__ += ['get_geoip', 'get_geoip_db', 'get_geodbs', 'reset_geoip', 'close_geoip', 'connect_geoip'] HAS_GEOIP = True except ImportError: log.debug('%s failed to import "geoip2", GeoIP2 dependent helpers will be disabled.', __name__)