Source code for privex.helpers.net.common

"""
General uncategorised functions/classes for network related helper code

**Copyright**::

        +===================================================+
        |                 © 2019 Privex Inc.                |
        |               https://www.privex.io               |
        +===================================================+
        |                                                   |
        |        Originally Developed by Privex Inc.        |
        |        License: X11 / MIT                         |
        |                                                   |
        |        Core Developer(s):                         |
        |                                                   |
        |          (+)  Chris (@someguy123) [Privex]        |
        |          (+)  Kale (@kryogenic) [Privex]          |
        |                                                   |
        +===================================================+

    Copyright 2019     Privex Inc.   ( https://www.privex.io )

"""
import asyncio
import logging
import random
import socket
from datetime import datetime
from math import ceil
from typing import List, Tuple

from privex.helpers.decorators import r_cache, r_cache_async

from privex.helpers import settings
from privex.helpers.common import byteify, empty, empty_if, is_true
from privex.helpers.asyncx import run_coro_thread_async
from privex.helpers.net import base as netbase
from privex.helpers.net.dns import resolve_ip, resolve_ip_async
from privex.helpers.net.socket import AsyncSocketWrapper
from privex.helpers.net.util import get_ssl_context, ip_is_v6
from privex.helpers.types import AUTO, AnyNum, IP_OR_STR

log = logging.getLogger(__name__)

__all__ = [
    'check_host', 'check_host_async', 'check_host_http', 'check_host_http_async', 'test_hosts_async',
    'test_hosts', 'check_v4', 'check_v6', 'check_v4_async', 'check_v6_async'
]


[docs]def check_host(host: IP_OR_STR, port: AnyNum, version='any', throw=False, **kwargs) -> bool: """ Test if the service on port ``port`` for host ``host`` is working. AsyncIO version: :func:`.check_host_async` Basic usage (services which send the client data immediately after connecting):: >>> check_host('hiveseed-se.privex.io', 2001) True >>> check_host('hiveseed-se.privex.io', 9991) False For some services, such as HTTP - it's necessary to transmit some data to the host before it will send a response. Using the ``send`` kwarg, you can transmit an arbitrary string/bytes upon connection. Sending data to ``host`` after connecting:: >>> check_host('files.privex.io', 80, send=b"GET / HTTP/1.1\\n\\n") True :param str|IPv4Address|IPv6Address host: Hostname or IP to test :param int|str port: Port number on ``host`` to connect to :param str|int version: When connecting to a hostname, this can be set to ``'v4'``, ``'v6'`` or similar to ensure the connection is via that IP version :param bool throw: (default: ``False``) When ``True``, will raise exceptions instead of returning ``False`` :param kwargs: Additional configuration options (see below) :keyword int receive: (default: ``100``) Amount of bytes to attempt to receive from the server (``0`` to disable) :keyword bytes|str send: If ``send`` is specified, the data in ``send`` will be transmitted to the server before receiving. :keyword int stype: Socket type, e.g. :attr:`socket.SOCK_STREAM` :keyword float|int timeout: Socket timeout. If not passed, uses the default from :func:`socket.getdefaulttimeout`. If the global default timeout is ``None``, then falls back to ``5.0`` :raises socket.timeout: When ``throw=True`` and a timeout occurs. :raises socket.gaierror: When ``throw=True`` and various errors occur :raises ConnectionRefusedError: When ``throw=True`` and the connection was refused :raises ConnectionResetError: When ``throw=True`` and the connection was reset :return bool success: ``True`` if successfully connected + sent/received data. Otherwise ``False``. """ receive, stype = int(kwargs.get('receive', 100)), kwargs.get('stype', socket.SOCK_STREAM) timeout, send, use_ssl = kwargs.get('timeout', 'n/a'), kwargs.get('send'), kwargs.get('ssl', kwargs.get('use_ssl')) ssl_params = kwargs.get('ssl_params', dict(verify_cert=False, check_hostname=False)) if timeout == 'n/a': t = socket.getdefaulttimeout() timeout = 10.0 if not t else t try: s_ver = socket.AF_INET ip = resolve_ip(host, version) if ip_is_v6(ip): s_ver = socket.AF_INET6 if port == 443 and use_ssl is None: log.warning("check_host: automatically setting use_ssl=True as port is 443 and use_ssl was not specified.") use_ssl = True with socket.socket(s_ver, stype) as s: orig_sock = s if timeout: s.settimeout(float(timeout)) if use_ssl: ctx = get_ssl_context(**ssl_params) s = ctx.wrap_socket( s, server_hostname=kwargs.get('server_hostname'), session=kwargs.get('session'), do_handshake_on_connect=kwargs.get('do_handshake_on_connect', True), ) s.connect((ip, int(port))) if not empty(send): s.sendall(byteify(send)) if receive > 0: s.recv(int(receive)) if use_ssl: s.close() return True except (socket.timeout, TimeoutError, ConnectionRefusedError, ConnectionResetError, socket.gaierror) as e: if throw: raise e return False
[docs]async def check_host_async(host: IP_OR_STR, port: AnyNum, version='any', throw=False, **kwargs) -> bool: """ AsyncIO version of :func:`.check_host`. Test if the service on port ``port`` for host ``host`` is working. Basic usage (services which send the client data immediately after connecting):: >>> await check_host_async('hiveseed-se.privex.io', 2001) True >>> await check_host_async('hiveseed-se.privex.io', 9991) False For some services, such as HTTP - it's necessary to transmit some data to the host before it will send a response. Using the ``send`` kwarg, you can transmit an arbitrary string/bytes upon connection. Sending data to ``host`` after connecting:: >>> await check_host_async('files.privex.io', 80, send=b"GET / HTTP/1.1\\n\\n") True :param str|IPv4Address|IPv6Address host: Hostname or IP to test :param int|str port: Port number on ``host`` to connect to :param str|int version: When connecting to a hostname, this can be set to ``'v4'``, ``'v6'`` or similar to ensure the connection is via that IP version :param bool throw: (default: ``False``) When ``True``, will raise exceptions instead of returning ``False`` :param kwargs: Additional configuration options (see below) :keyword int receive: (default: ``100``) Amount of bytes to attempt to receive from the server (``0`` to disable) :keyword bytes|str send: If ``send`` is specified, the data in ``send`` will be transmitted to the server before receiving. :keyword int stype: Socket type, e.g. :attr:`socket.SOCK_STREAM` :keyword float|int timeout: Socket timeout. If not passed, uses the default from :func:`socket.getdefaulttimeout`. If the global default timeout is ``None``, then falls back to ``5.0`` :raises socket.timeout: When ``throw=True`` and a timeout occurs. :raises socket.gaierror: When ``throw=True`` and various errors occur :raises ConnectionRefusedError: When ``throw=True`` and the connection was refused :raises ConnectionResetError: When ``throw=True`` and the connection was reset :return bool success: ``True`` if successfully connected + sent/received data. Otherwise ``False``. """ receive, stype = int(kwargs.get('receive', 16)), kwargs.get('stype', socket.SOCK_STREAM) timeout, send = kwargs.get('timeout', 'n/a'), kwargs.get('send') http_test, use_ssl = kwargs.get('http_test', False), kwargs.get('use_ssl', False) if timeout == 'n/a': t = socket.getdefaulttimeout() timeout = settings.DEFAULT_SOCKET_TIMEOUT if not t else t # loop = asyncio.get_event_loop() s_ver = socket.AF_INET ip = await resolve_ip_async(host, version) if ip_is_v6(ip): s_ver = socket.AF_INET6 try: aw = AsyncSocketWrapper(host, int(port), family=s_ver, use_ssl=use_ssl, timeout=timeout) await aw.connect() if http_test: log.info("Sending HTTP request to %s", host) log.info("Response from %s : %s", host, await aw.http_request()) elif not empty(send) and receive > 0: log.info("Sending query data '%s' and trying to receive data from %s", send, host) log.info("Response from %s : %s", host, await aw.query(send, receive, read_timeout=kwargs.get('read_timeout', AUTO))) elif not empty(send): log.info("Sending query data '%s' to %s", send, host) await aw.sendall(send) else: log.info("Receiving data from %s", host) log.info("Response from %s : %s", host, await aw.read_eof( receive, strip=False, read_timeout=kwargs.get('read_timeout', AUTO), )) # with socket.socket(s_ver, stype) as s: # if timeout: s.settimeout(float(timeout)) # await loop.sock_connect(s, (ip, int(port))) # if not empty(send): # await loop.sock_sendall(s, byteify(send)) # if receive > 0: # await loop.sock_recv(s, int(receive)) return True except (socket.timeout, TimeoutError, ConnectionRefusedError, ConnectionResetError, socket.gaierror) as e: if throw: raise e return False
def check_host_http(host: IP_OR_STR, port: AnyNum = 80, version='any', throw=False, **kwargs) -> bool: return netbase.check_host(host, port, version, throw=throw, http_test=True, **kwargs) async def check_host_http_async( host: IP_OR_STR, port: AnyNum = 80, version='any', throw=False, send=b"GET / HTTP/1.1\\n\\n", **kwargs ) -> bool: # return await check_host_async(host, port, version, throw=throw, send=send, **kwargs) return await netbase.check_host_async(host, port, version, throw=throw, http_test=True, **kwargs) async def test_hosts_async(hosts: List[str] = None, ipver: str = 'any', timeout: AnyNum = None, **kwargs) -> bool: randomise = is_true(kwargs.get('randomise', True)) max_hosts = kwargs.get('max_hosts', settings.NET_CHECK_HOST_COUNT_TRY) if max_hosts is not None: max_hosts = int(max_hosts) timeout = empty_if(timeout, empty_if(socket.getdefaulttimeout(), 4, zero=True), zero=True) v4h, v6h = list(settings.V4_TEST_HOSTS), list(settings.V6_TEST_HOSTS) if randomise: random.shuffle(v4h) if randomise: random.shuffle(v6h) if empty(hosts, True, True): # if empty(ipver, True, True) or ipver in ['any', 'all', 'both', 10, '10', '46', 46]: # settings.V4_CHECKED_AT if isinstance(ipver, str): ipver = ipver.lower() if ipver in [4, '4', 'v4', 'ipv4']: hosts = v4h ipver = 4 elif ipver in [6, '6', 'v6', 'ipv6']: hosts = v6h ipver = 6 else: ipver = 'any' if max_hosts: hosts = v4h[:int(ceil(max_hosts / 2))] + v6h[:int(ceil(max_hosts / 2))] else: hosts = v4h + v6h if max_hosts: hosts = hosts[:max_hosts] # st4_empty = any([empty(settings.HAS_WORKING_V4, True, True), empty(settings.V4_CHECKED_AT, True, True)]) # st6_empty = any([empty(settings.HAS_WORKING_V6, True, True), empty(settings.V6_CHECKED_AT, True, True)]) # if ipver == 6 and not st6_empty and settings.V6_CHECKED_AT > datetime.utcnow(): # # if settings.V6_CHECKED_AT > datetime.utcnow() # log.debug("Returning cached IPv6 status: working = %s", settings.HAS_WORKING_V6) # return settings.HAS_WORKING_V6 # if ipver == 4 and not st4_empty and settings.V4_CHECKED_AT > datetime.utcnow(): # # if settings.V6_CHECKED_AT > datetime.utcnow() # log.debug("Returning cached IPv4 status: working = %s", settings.HAS_WORKING_V4) # return settings.HAS_WORKING_V4 # # if ipver == 'any' and any([not st4_empty, not st6_empty]) and settings.V4_CHECKED_AT > datetime.utcnow(): # # if settings.V6_CHECKED_AT > datetime.utcnow() # if st4_empty: # log.debug("test_hosts being requested for 'any' ip ver. IPv6 status cached, but not IPv4 status. Checking IPv4 status...") # await check_v4_async() # if st6_empty: # log.debug("test_hosts being requested for 'any' ip ver. IPv4 status cached, but not IPv6 status. Checking IPv6 status...") # await check_v6_async(hosts) # # if not st4_empty and not st6_empty: # log.debug( # "Returning status %s based on: Working IPv4 = %s || Working IPv6 = %s", # settings.HAS_WORKING_V4 or settings.HAS_WORKING_V6, settings.HAS_WORKING_V4, settings.HAS_WORKING_V6 # ) # return settings.HAS_WORKING_V4 or settings.HAS_WORKING_V6 # max_hosts = int(kwargs.get('max_hosts', settings.NET_CHECK_HOST_COUNT_TRY)) min_hosts_pos = int(kwargs.get('required_positive', settings.NET_CHECK_HOST_COUNT)) # hosts = empty_if(hosts, settings.V4_TEST_HOSTS, itr=True) hosts = [x for x in hosts] if randomise: random.shuffle(hosts) if len(hosts) > max_hosts: hosts = hosts[:max_hosts] # port = empty_if(port, 80, zero=True) total_hosts = len(hosts) total_working, total_broken = 0, 0 working_list, broken_list = [], [] log.debug("Testing %s hosts with IP version '%s' - timeout: %s", total_hosts, ipver, timeout) host_checks = [] host_checks_hosts = [] for h in hosts: # host_checks.append( # asyncio.create_task(_test_host_async(h, ipver=ipver, timeout=timeout)) # ) host_checks.append( asyncio.create_task( run_coro_thread_async(_test_host_async, h, ipver=ipver, timeout=timeout) ) ) host_checks_hosts.append(h) host_checks_res = await asyncio.gather(*host_checks, return_exceptions=True) for i, _res in enumerate(host_checks_res): h = host_checks_hosts[i] if isinstance(_res, Exception): log.warning("Exception while checking host %s", h) total_broken += 1 continue res, h, port = _res if res: total_working += 1 working_list.append(f"{h}:{port}") log.debug("check_host for %s (port %s) came back True (WORKING). incremented working hosts: %s", h, port, total_working) else: total_broken += 1 broken_list.append(f"{h}:{port}") log.debug("check_host for %s (port %s) came back False (! BROKEN !). incremented broken hosts: %s", h, port, total_broken) # port = 80 # for h in hosts: # try: # h, port, res = await _test_host_async(h, ipver, timeout) # if res: # total_working += 1 # log.debug("check_host for %s came back true. incremented working hosts: %s", h, total_working) # else: # total_broken += 1 # log.debug("check_host for %s came back false. incremented broken hosts: %s", h, total_broken) # # except Exception as e: # log.warning("Exception while checking host %s port %s", h, port) working = total_working >= min_hosts_pos log.info("test_hosts - proto: %s - protocol working? %s || total hosts: %s || working hosts: %s || broken hosts: %s", ipver, working, total_hosts, total_working, total_broken) log.debug("working hosts: %s", working_list) log.debug("broken hosts: %s", broken_list) return working async def _test_host_async(host, ipver: str = 'any', timeout: AnyNum = None) -> Tuple[bool, str, int]: nh = host.split(':') if len(nh) > 1: port = int(nh[-1]) host = ':'.join(nh[:-1]) else: host = ':'.join(nh) log.warning("Host is missing port: %s - falling back to port 80") port = 80 log.debug("Checking host %s via port %s + IP version '%s'", host, port, ipver) if port == 80: res = await check_host_http_async(host, port, ipver, throw=False, timeout=timeout) elif port == 53: res = await netbase.check_host_async(host, port, ipver, throw=False, timeout=timeout, send="hello\nworld\n") else: res = await netbase.check_host_async(host, port, ipver, throw=False, timeout=timeout) return res, host, port def test_hosts(hosts: List[str] = None, ipver: str = 'any', timeout: AnyNum = None, **kwargs) -> bool: randomise = is_true(kwargs.get('randomise', True)) max_hosts = kwargs.get('max_hosts', settings.NET_CHECK_HOST_COUNT_TRY) if max_hosts is not None: max_hosts = int(max_hosts) timeout = empty_if(timeout, empty_if(socket.getdefaulttimeout(), 4, zero=True), zero=True) v4h, v6h = list(settings.V4_TEST_HOSTS), list(settings.V6_TEST_HOSTS) if randomise: random.shuffle(v4h) if randomise: random.shuffle(v6h) if empty(hosts, True, True): # if empty(ipver, True, True) or ipver in ['any', 'all', 'both', 10, '10', '46', 46]: # settings.V4_CHECKED_AT if isinstance(ipver, str): ipver = ipver.lower() if ipver in [4, '4', 'v4', 'ipv4']: hosts = v4h ipver = 4 elif ipver in [6, '6', 'v6', 'ipv6']: hosts = v6h ipver = 6 else: ipver = 'any' if max_hosts: hosts = v4h[:int(ceil(max_hosts / 2))] + v6h[:int(ceil(max_hosts / 2))] else: hosts = v4h + v6h if max_hosts: hosts = hosts[:max_hosts] # st4_empty = any([empty(settings.HAS_WORKING_V4, True, True), empty(settings.V4_CHECKED_AT, True, True)]) # st6_empty = any([empty(settings.HAS_WORKING_V6, True, True), empty(settings.V6_CHECKED_AT, True, True)]) # if ipver == 6 and not st6_empty and settings.V6_CHECKED_AT > datetime.utcnow(): # # if settings.V6_CHECKED_AT > datetime.utcnow() # log.debug("Returning cached IPv6 status: working = %s", settings.HAS_WORKING_V6) # return settings.HAS_WORKING_V6 # if ipver == 4 and not st4_empty and settings.V4_CHECKED_AT > datetime.utcnow(): # # if settings.V6_CHECKED_AT > datetime.utcnow() # log.debug("Returning cached IPv4 status: working = %s", settings.HAS_WORKING_V4) # return settings.HAS_WORKING_V4 # if ipver == 'any' and any([not st4_empty, not st6_empty]) and settings.V4_CHECKED_AT > datetime.utcnow(): # # if settings.V6_CHECKED_AT > datetime.utcnow() # if st4_empty: # log.debug("test_hosts being requested for 'any' ip ver. IPv6 status cached, but not IPv4 status. Checking IPv4 status...") # check_v4() # if st6_empty: # log.debug("test_hosts being requested for 'any' ip ver. IPv4 status cached, but not IPv6 status. Checking IPv6 status...") # check_v6() # # if not st4_empty and not st6_empty: # log.debug( # "Returning status %s based on: Working IPv4 = %s || Working IPv6 = %s", # settings.HAS_WORKING_V4 or settings.HAS_WORKING_V6, settings.HAS_WORKING_V4, settings.HAS_WORKING_V6 # ) # return settings.HAS_WORKING_V4 or settings.HAS_WORKING_V6 # max_hosts = int(kwargs.get('max_hosts', settings.NET_CHECK_HOST_COUNT_TRY)) min_hosts_pos = int(kwargs.get('required_positive', settings.NET_CHECK_HOST_COUNT)) # hosts = empty_if(hosts, settings.V4_TEST_HOSTS, itr=True) hosts = [x for x in hosts] if randomise: random.shuffle(hosts) if len(hosts) > max_hosts: hosts = hosts[:max_hosts] total_hosts = len(hosts) total_working, total_broken = 0, 0 log.debug("Testing %s hosts with IP version '%s' - timeout: %s", total_hosts, ipver, timeout) port = 80 for h in hosts: try: nh = h.split(':') if len(nh) > 1: port = int(nh[-1]) h = ':'.join(nh[:-1]) else: h = ':'.join(nh) log.warning("Host is missing port: %s - falling back to port 80") port = 80 log.debug("Checking host %s via port %s + IP version '%s'", h, port, ipver) if port == 80: res = check_host_http(h, port, ipver, throw=False, timeout=timeout) else: res = check_host(h, port, ipver, throw=False, timeout=timeout) if res: total_working += 1 log.debug("check_host for %s came back true. incremented working hosts: %s", h, total_working) else: total_broken += 1 log.debug("check_host for %s came back false. incremented broken hosts: %s", h, total_broken) except Exception as e: log.warning("Exception while checking host %s port %s", h, port) working = total_working >= min_hosts_pos log.info("test_hosts - proto: %s - protocol working? %s || total hosts: %s || working hosts: %s || broken hosts: %s", ipver, working, total_hosts, total_working, total_broken) return working @r_cache("pvxhelpers:check_v4", settings.NET_CHECK_TIMEOUT) def check_v4(hosts: List[str] = None, *args, **kwargs) -> bool: """Check and cache whether IPv4 is functional by testing a handful of IPv4 hosts""" return test_hosts(hosts, ipver='v4', *args, **kwargs) @r_cache("pvxhelpers:check_v6", settings.NET_CHECK_TIMEOUT) def check_v6(hosts: List[str] = None, *args, **kwargs) -> bool: """Check and cache whether IPv6 is functional by testing a handful of IPv6 hosts""" return test_hosts(hosts, ipver='v6', *args, **kwargs) @r_cache_async("pvxhelpers:check_v4", settings.NET_CHECK_TIMEOUT) async def check_v4_async(hosts: List[str] = None, *args, **kwargs) -> bool: """(Async ver of :func:`.check_v4`) Check and cache whether IPv4 is functional by testing a handful of IPv4 hosts""" return await test_hosts_async(hosts, ipver='v4', *args, **kwargs) @r_cache_async("pvxhelpers:check_v6", settings.NET_CHECK_TIMEOUT) async def check_v6_async(hosts: List[str] = None, *args, **kwargs) -> bool: """(Async ver of :func:`.check_v6`) Check and cache whether IPv6 is functional by testing a handful of IPv6 hosts""" return await test_hosts_async(hosts, ipver='v6', *args, **kwargs)