from __future__ import annotations import hashlib import hmac import os import socket import sys import typing import warnings from binascii import unhexlify from ..exceptions import ProxySchemeUnsupported, SSLError from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE SSLContext = None SSLTransport = None HAS_NEVER_CHECK_COMMON_NAME = False IS_PYOPENSSL = False ALPN_PROTOCOLS = ["http/1.1"] _TYPE_VERSION_INFO = tuple[int, int, int, str, int] # Maps the length of a digest to a possible hash function producing this digest HASHFUNC_MAP = { length: getattr(hashlib, algorithm, None) for length, algorithm in ((32, "md5"), (40, "sha1"), (64, "sha256")) } def _is_has_never_check_common_name_reliable( openssl_version: str, ) -> bool: # As of May 2023, all released versions of LibreSSL fail to reject certificates with # only common names, see https://github.com/urllib3/urllib3/pull/3024 is_openssl = openssl_version.startswith("OpenSSL ") return is_openssl if typing.TYPE_CHECKING: from ssl import VerifyMode from typing import TypedDict from .ssltransport import SSLTransport as SSLTransportType class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): subjectAltName: tuple[tuple[str, str], ...] subject: tuple[tuple[tuple[str, str], ...], ...] serialNumber: str # Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' _SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} try: # Do we have ssl at all? import ssl from ssl import ( # type: ignore[assignment] CERT_REQUIRED, HAS_NEVER_CHECK_COMMON_NAME, OP_NO_COMPRESSION, OP_NO_TICKET, OPENSSL_VERSION, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT, VERIFY_X509_PARTIAL_CHAIN, VERIFY_X509_STRICT, OP_NO_SSLv2, OP_NO_SSLv3, SSLContext, TLSVersion, ) PROTOCOL_SSLv23 = PROTOCOL_TLS # Setting SSLContext.hostname_checks_common_name = False didn't work with # LibreSSL, check details in the used function. if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( OPENSSL_VERSION, ): # Defensive: HAS_NEVER_CHECK_COMMON_NAME = False # Need to be careful here in case old TLS versions get # removed in future 'ssl' module implementations. for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): try: _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( TLSVersion, attr ) except AttributeError: # Defensive: continue from .ssltransport import SSLTransport # type: ignore[assignment] except ImportError: OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment, misc] OP_NO_TICKET = 0x4000 # type: ignore[assignment, misc] OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment, misc] OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment, misc] PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment, misc] PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment, misc] VERIFY_X509_PARTIAL_CHAIN = 0x80000 # type: ignore[assignment,misc] VERIFY_X509_STRICT = 0x20 # type: ignore[assignment, misc] _TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: """ Checks if given fingerprint matches the supplied certificate. :param cert: Certificate as bytes object. :param fingerprint: Fingerprint as string of hexdigits, can be interspersed by colons. """ if cert is None: raise SSLError("No certificate for the peer.") fingerprint = fingerprint.replace(":", "").lower() digest_length = len(fingerprint) if digest_length not in HASHFUNC_MAP: raise SSLError(f"Fingerprint of invalid length: {fingerprint}") hashfunc = HASHFUNC_MAP.get(digest_length) if hashfunc is None: raise SSLError( f"Hash function implementation unavailable for fingerprint length: {digest_length}" ) # We need encode() here for py32; works on py2 and p33. fingerprint_bytes = unhexlify(fingerprint.encode()) cert_digest = hashfunc(cert).digest() if not hmac.compare_digest(cert_digest, fingerprint_bytes): raise SSLError( f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' ) def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: """ Resolves the argument to a numeric constant, which can be passed to the wrap_socket function/method from the ssl module. Defaults to :data:`ssl.CERT_REQUIRED`. If given a string it is assumed to be the name of the constant in the :mod:`ssl` module or its abbreviation. (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. If it's neither `None` nor a string we assume it is already the numeric constant which can directly be passed to wrap_socket. """ if candidate is None: return CERT_REQUIRED if isinstance(candidate, str): res = getattr(ssl, candidate, None) if res is None: res = getattr(ssl, "CERT_" + candidate) return res # type: ignore[no-any-return] return candidate # type: ignore[return-value] def resolve_ssl_version(candidate: None | int | str) -> int: """ like resolve_cert_reqs """ if candidate is None: return PROTOCOL_TLS if isinstance(candidate, str): res = getattr(ssl, candidate, None) if res is None: res = getattr(ssl, "PROTOCOL_" + candidate) return typing.cast(int, res) return candidate def create_urllib3_context( ssl_version: int | None = None, cert_reqs: int | None = None, options: int | None = None, ciphers: str | None = None, ssl_minimum_version: int | None = None, ssl_maximum_version: int | None = None, verify_flags: int | None = None, ) -> ssl.SSLContext: """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. :param ssl_version: The desired protocol version to use. This will default to PROTOCOL_SSLv23 which will negotiate the highest protocol that both the server and your installation of OpenSSL support. This parameter is deprecated instead use 'ssl_minimum_version'. :param ssl_minimum_version: The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. :param ssl_maximum_version: The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the default value. :param cert_reqs: Whether to require the certificate verification. This defaults to ``ssl.CERT_REQUIRED``. :param options: Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. :param ciphers: Which cipher suites to allow the server to select. Defaults to either system configured ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. :param verify_flags: The flags for certificate verification operations. These default to ``ssl.VERIFY_X509_PARTIAL_CHAIN`` and ``ssl.VERIFY_X509_STRICT`` for Python 3.13+. :returns: Constructed SSLContext object with specified options :rtype: SSLContext """ if SSLContext is None: raise TypeError("Can't create an SSLContext object without an ssl module") # This means 'ssl_version' was specified as an exact value. if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' # to avoid conflicts. if ssl_minimum_version is not None or ssl_maximum_version is not None: raise ValueError( "Can't specify both 'ssl_version' and either " "'ssl_minimum_version' or 'ssl_maximum_version'" ) # 'ssl_version' is deprecated and will be removed in the future. else: # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( ssl_version, TLSVersion.MINIMUM_SUPPORTED ) ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( ssl_version, TLSVersion.MAXIMUM_SUPPORTED ) # This warning message is pushing users to use 'ssl_minimum_version' # instead of both min/max. Best practice is to only set the minimum version and # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' warnings.warn( "'ssl_version' option is deprecated and will be " "removed in urllib3 v3.0. Instead use 'ssl_minimum_version'", category=FutureWarning, stacklevel=2, ) context = SSLContext(PROTOCOL_TLS_CLIENT) if ssl_minimum_version is not None: context.minimum_version = ssl_minimum_version else: # pyOpenSSL defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here context.minimum_version = TLSVersion.TLSv1_2 if ssl_maximum_version is not None: context.maximum_version = ssl_maximum_version # Unless we're given ciphers defer to either system ciphers in # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. if ciphers: context.set_ciphers(ciphers) # Setting the default here, as we may have no ssl module on import cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs if options is None: options = 0 # SSLv2 is easily broken and is considered harmful and dangerous options |= OP_NO_SSLv2 # SSLv3 has several problems and is now dangerous options |= OP_NO_SSLv3 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ # (issue #309) options |= OP_NO_COMPRESSION # TLSv1.2 only. Unless set explicitly, do not request tickets. # This may save some bandwidth on wire, and although the ticket is encrypted, # there is a risk associated with it being on wire, # if the server is not rotating its ticketing keys properly. options |= OP_NO_TICKET context.options |= options if verify_flags is None: verify_flags = 0 # In Python 3.13+ ssl.create_default_context() sets VERIFY_X509_PARTIAL_CHAIN # and VERIFY_X509_STRICT so we do the same if sys.version_info >= (3, 13): verify_flags |= VERIFY_X509_PARTIAL_CHAIN verify_flags |= VERIFY_X509_STRICT context.verify_flags |= verify_flags # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is # necessary for conditional client cert authentication with TLS 1.3. # The attribute is None for OpenSSL <= 1.1.0 or does not exist when using # an SSLContext created by pyOpenSSL. if getattr(context, "post_handshake_auth", None) is not None: context.post_handshake_auth = True # The order of the below lines setting verify_mode and check_hostname # matter due to safe-guards SSLContext has to prevent an SSLContext with # check_hostname=True, verify_mode=NONE/OPTIONAL. # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own # 'ssl.match_hostname()' implementation. if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: context.verify_mode = cert_reqs context.check_hostname = True else: context.check_hostname = False context.verify_mode = cert_reqs context.hostname_checks_common_name = False if "SSLKEYLOGFILE" in os.environ: sslkeylogfile = os.path.expandvars(os.environ.get("SSLKEYLOGFILE")) else: sslkeylogfile = None if sslkeylogfile: context.keylog_filename = sslkeylogfile return context @typing.overload def ssl_wrap_socket( sock: socket.socket, keyfile: str | None = ..., certfile: str | None = ..., cert_reqs: int | None = ..., ca_certs: str | None = ..., server_hostname: str | None = ..., ssl_version: int | None = ..., ciphers: str | None = ..., ssl_context: ssl.SSLContext | None = ..., ca_cert_dir: str | None = ..., key_password: str | None = ..., ca_cert_data: None | str | bytes = ..., tls_in_tls: typing.Literal[False] = ..., ) -> ssl.SSLSocket: ... @typing.overload def ssl_wrap_socket( sock: socket.socket, keyfile: str | None = ..., certfile: str | None = ..., cert_reqs: int | None = ..., ca_certs: str | None = ..., server_hostname: str | None = ..., ssl_version: int | None = ..., ciphers: str | None = ..., ssl_context: ssl.SSLContext | None = ..., ca_cert_dir: str | None = ..., key_password: str | None = ..., ca_cert_data: None | str | bytes = ..., tls_in_tls: bool = ..., ) -> ssl.SSLSocket | SSLTransportType: ... def ssl_wrap_socket( sock: socket.socket, keyfile: str | None = None, certfile: str | None = None, cert_reqs: int | None = None, ca_certs: str | None = None, server_hostname: str | None = None, ssl_version: int | None = None, ciphers: str | None = None, ssl_context: ssl.SSLContext | None = None, ca_cert_dir: str | None = None, key_password: str | None = None, ca_cert_data: None | str | bytes = None, tls_in_tls: bool = False, ) -> ssl.SSLSocket | SSLTransportType: """ All arguments except for server_hostname, ssl_context, tls_in_tls, ca_cert_data and ca_cert_dir have the same meaning as they do when using :func:`ssl.create_default_context`, :meth:`ssl.SSLContext.load_cert_chain`, :meth:`ssl.SSLContext.set_ciphers` and :meth:`ssl.SSLContext.wrap_socket`. :param server_hostname: When SNI is supported, the expected hostname of the certificate :param ssl_context: A pre-made :class:`SSLContext` object. If none is provided, one will be created using :func:`create_urllib3_context`. :param ciphers: A string of ciphers we wish the client to support. :param ca_cert_dir: A directory containing CA certificates in multiple separate files, as supported by OpenSSL's -CApath flag or the capath argument to SSLContext.load_verify_locations(). :param key_password: Optional password if the keyfile is encrypted. :param ca_cert_data: Optional string containing CA certificates in PEM format suitable for passing as the cadata parameter to SSLContext.load_verify_locations() :param tls_in_tls: Use SSLTransport to wrap the existing socket. """ context = ssl_context if context is None: # Note: This branch of code and all the variables in it are only used in tests. # We should consider deprecating and removing this code. context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) if ca_certs or ca_cert_dir or ca_cert_data: try: context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) except OSError as e: raise SSLError(e) from e elif ssl_context is None and hasattr(context, "load_default_certs"): # try to load OS default certs; works well on Windows. context.load_default_certs() # Attempt to detect if we get the goofy behavior of the # keyfile being encrypted and OpenSSL asking for the # passphrase via the terminal and instead error out. if keyfile and key_password is None and _is_key_file_encrypted(keyfile): raise SSLError("Client private key is encrypted, password is required") if certfile: if key_password is None: context.load_cert_chain(certfile, keyfile) else: context.load_cert_chain(certfile, keyfile, key_password) context.set_alpn_protocols(ALPN_PROTOCOLS) ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) return ssl_sock def is_ipaddress(hostname: str | bytes) -> bool: """Detects whether the hostname given is an IPv4 or IPv6 address. Also detects IPv6 addresses with Zone IDs. :param str hostname: Hostname to examine. :return: True if the hostname is an IP address, False otherwise. """ if isinstance(hostname, bytes): # IDN A-label bytes are ASCII compatible. hostname = hostname.decode("ascii") return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) def _is_key_file_encrypted(key_file: str) -> bool: """Detects if a key file is encrypted or not.""" with open(key_file) as f: for line in f: # Look for Proc-Type: 4,ENCRYPTED if "ENCRYPTED" in line: return True return False def _ssl_wrap_socket_impl( sock: socket.socket, ssl_context: ssl.SSLContext, tls_in_tls: bool, server_hostname: str | None = None, ) -> ssl.SSLSocket | SSLTransportType: if tls_in_tls: if not SSLTransport: # Import error, ssl is not available. raise ProxySchemeUnsupported( "TLS in TLS requires support for the 'ssl' module" ) SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) return SSLTransport(sock, ssl_context, server_hostname) return ssl_context.wrap_socket(sock, server_hostname=server_hostname)