Source code for privex.helpers.net.util

import platform
import socket
import ssl
import subprocess
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network, ip_address, ip_network
from typing import List, Optional, Union
from privex.helpers import settings
from privex.helpers.common import empty_if, byteify, is_true, stringify
from privex.helpers.exceptions import NetworkUnreachable
from privex.helpers.types import IP_OR_STR, STRBYTES

__all__ = [
    'ip_is_v4', 'ip_is_v6', 'ping', 'IPV4_ALIASES', 'IPV6_ALIASES', 'ip_ver_to_int', 'ip_ver_to_sock',
    'sock_ver', 'is_ip', 'sock_validate_ip'
]


[docs]def ip_is_v4(ip: str) -> bool: """ Determines whether an IP address is IPv4 or not :param str ip: An IP address as a string, e.g. 192.168.1.1 :raises ValueError: When the given IP address ``ip`` is invalid :return bool: True if IPv6, False if not (i.e. probably IPv4) """ return type(ip_address(ip)) == IPv4Address
[docs]def ip_is_v6(ip: str) -> bool: """ Determines whether an IP address is IPv6 or not :param str ip: An IP address as a string, e.g. 192.168.1.1 :raises ValueError: When the given IP address ``ip`` is invalid :return bool: True if IPv6, False if not (i.e. probably IPv4) """ return type(ip_address(ip)) == IPv6Address
[docs]def ping(ip: str, timeout: int = 30) -> bool: """ Sends a ping to a given IPv4 / IPv6 address. Tested with IPv4+IPv6 using ``iputils-ping`` on Linux, as well as the default IPv4 ``ping`` utility on Mac OSX (Mojave, 10.14.6). Fully supported when using Linux with the ``iputils-ping`` package. Only IPv4 support on Mac OSX. **Example Usage**:: >>> from privex.helpers import ping >>> if ping('127.0.0.1', 5) and ping('::1', 10): ... print('Both 127.0.0.1 and ::1 are up') ... else: ... print('127.0.0.1 or ::1 failed to respond to a ping within the given timeout.') **Known Incompatibilities**: * NOT compatible with IPv6 addresses on OSX due to the lack of a timeout argument with ``ping6`` * NOT compatible with IPv6 addresses when using ``inetutils-ping`` on Linux due to separate ``ping6`` command :param str ip: An IP address as a string, e.g. ``192.168.1.1`` or ``2a07:e00::1`` :param int timeout: (Default: 30) Number of seconds to wait for a response from the ping before timing out :raises ValueError: When the given IP address ``ip`` is invalid or ``timeout`` < 1 :return bool: ``True`` if ping got a response from the given IP, ``False`` if not """ ip_obj = ip_address(ip) # verify IP is valid (this will throw if it isn't) if timeout < 1: raise ValueError('timeout value cannot be less than 1 second') opts4 = { 'Linux': ["/bin/ping", "-c1", f"-w{timeout}"], 'Darwin': ["/sbin/ping", "-c1", f"-t{timeout}"] } opts6 = {'Linux': ["/bin/ping", "-c1", f"-w{timeout}"]} opts = opts4 if ip_is_v4(ip_obj) else opts6 if platform.system() not in opts: raise NotImplementedError(f"{__name__}.ping is not fully supported on platform '{platform.system()}'...") with subprocess.Popen(opts[platform.system()] + [ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: out, err = proc.communicate() err = err.decode('utf-8') if 'network is unreachable' in err.lower(): raise NetworkUnreachable(f'Got error from ping: "{err}"') return 'bytes from {}'.format(ip) in out.decode('utf-8')
IPV4_ALIASES = [4, 'v4', '4', 'ipv4', 'ip4', 'inet', 'inet4', socket.AF_INET, str(socket.AF_INET)] IPV6_ALIASES = [6, 'v6', '6', 'ipv6', 'ip6', 'inet6', socket.AF_INET6, str(socket.AF_INET6)] def ip_ver_to_int(ver: Union[str, int]) -> int: ver = str(ver).lower() if ver in IPV4_ALIASES: return 4 if ver in IPV6_ALIASES: return 6 return 0 def sock_ver(version) -> Optional[int]: version = empty_if(version, 'any', zero=True, itr=True) version = str(version).lower() if ip_ver_to_int(version) == 4: return socket.AF_INET if ip_ver_to_int(version) == 6: return socket.AF_INET6 return None ip_ver_to_sock = sock_ver def ip_sock_ver(ip_addr) -> Optional[int]: a = ip_network(ip_addr, strict=False) if isinstance(a, (IPv4Address, IPv4Network)): return socket.AF_INET if isinstance(a, (IPv6Address, IPv6Network)): return socket.AF_INET6 return None def is_ip(addr: str, version: int = None): try: res = sock_validate_ip(addr, version=version) return res except AttributeError as e: raise e except ValueError: return False def sock_validate_ip(addr: IP_OR_STR, version: int, throw=True) -> Optional[Union[IPv4Address, IPv4Address]]: ip = ip_address(addr) ver = "v4" if ip_is_v4(ip) else "v6" if version == socket.AF_INET and ver != 'v4': if not throw: return None raise AttributeError(f"Passed address '{addr}' was an IPv6 address, but 'version' requested an IPv4 address.") if version == socket.AF_INET6 and ver != 'v6': if not throw: return None raise AttributeError(f"Passed address '{addr}' was an IPv4 address, but 'version' requested an IPv6 address.") return ip def get_ssl_context( verify_cert: bool = False, check_hostname: Optional[bool] = None, verify_mode: Optional[int] = None, **kwargs ) -> ssl.SSLContext: check_hostname = empty_if(check_hostname, is_true(verify_cert)) verify_mode = empty_if(verify_mode, ssl.CERT_REQUIRED if verify_cert else ssl.CERT_NONE) ctx = ssl.create_default_context() ctx.check_hostname = check_hostname ctx.verify_mode = verify_mode return ctx def generate_http_request( url="/", host=None, method="GET", user_agent=settings.DEFAULT_USER_AGENT, extra_data: Union[STRBYTES, List[str]] = None, body: STRBYTES = None, **kwargs ) -> bytes: method, url = stringify(method), stringify(url) http_ver = stringify(kwargs.get('http_ver', '1.0')) data = f"{method.upper()} {url} HTTP/{http_ver}\n" if host is not None: data += f"Host: {stringify(host)}\n" if user_agent is not None: data += f"User-Agent: {stringify(user_agent)}\n" data = byteify(data) if extra_data is not None: if isinstance(extra_data, list): extra_data = [byteify(x) for x in extra_data] data += b"\n".join(extra_data) else: data += byteify(extra_data) if not data.endswith(b"\n"): data += b"\n" if body is not None: data += byteify(body) if not data.endswith(b"\n"): data += b"\n" if not data.endswith(b"\n\n"): data += b"\n" return data