From 6c7df68c7cb7b2a08e3ea8da3c53231530001065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=2E=20K=C3=A4rkk=C3=A4inen?= <98187+Tronic@users.noreply.github.com> Date: Thu, 28 Oct 2021 14:50:05 +0100 Subject: [PATCH] Vhost support using multiple TLS certificates (#2270) * Initial support for using multiple SSL certificates. * Also list IP address subjectAltNames on log. * Use Python 3.7+ way of specifying TLSv1.2 as the minimum version. Linter fixes. * isort * Cleanup, store server name for later use. Add RSA ciphers. Log rejected SNIs. * Cleanup, linter. * Alter the order of initial log messages and handling. In particular, enable debug mode early so that debug messages during init can be shown. * Store server name (SNI) to conn_info. * Update test with new error message. * Refactor for readability. * Cleanup * Replace old expired test cert with new ones and a script for regenerating them as needed. * Refactor TLS tests to a separate file. * Add cryptography to dev deps for rebuilding TLS certs. * Minor adjustment to messages. * Tests added for new TLS code. * Find the correct log row before testing for message. The order was different on CI. * More log message order fixup. The tests do not account for the logo being printed first. * Another attempt at log message indexing fixup. * Major TLS refactoring. CertSelector now allows dicts and SSLContext within its list. Server names are stored even when no list is used. SSLContext.sanic now contains a dict with any setting passed and information extracted from cert. That information is available on request.conn_info.cert. Type annotations added. More tests incl. a handler for faking hostname in tests. * Remove a problematic logger test that apparently was not adding any coverage or value to anything. * Revert accidental commit of uvloop disable. * Typing fixes / refactoring. * Additional test for cert selection. Certs recreated without DNS:localhost on sanic.example cert. * Add tests for single certificate path shorthand and SNI information. * Move TLS dict processing to CertSimple, make the names field optional and use names from the cert if absent. * Sanic CLI options --tls and --tls-strict-host to use the new features. * SSL argument typing updated * Use ValueError for internal message passing to avoid CertificateError's odd message formatting. * Linter * Test CLI TLS options. * Maybe the right codeclimate option now... * Improved TLS argument help, removed support for combining --cert/--key with --tls. * Removed support for strict checking without any certs, black forced fscked up formatting. * Update CLI tests for stricter TLS options. Co-authored-by: L. Karkkainen Co-authored-by: Adam Hopkins --- .codeclimate.yml | 2 + sanic/__main__.py | 62 +++- sanic/app.py | 78 ++-- sanic/models/server_types.py | 16 +- sanic/tls.py | 196 ++++++++++ setup.py | 1 + tests/certs/createcerts.py | 113 ++++++ tests/certs/invalid.certmissing/privkey.pem | 5 + tests/certs/localhost/fullchain.pem | 12 + tests/certs/localhost/privkey.pem | 5 + tests/certs/sanic.example/fullchain.pem | 21 ++ tests/certs/sanic.example/privkey.pem | 27 ++ tests/certs/selfsigned.cert | 22 -- tests/certs/selfsigned.key | 27 -- tests/test_cli.py | 52 +++ tests/test_logging.py | 55 --- tests/test_requests.py | 88 ----- tests/test_tls.py | 378 ++++++++++++++++++++ 18 files changed, 913 insertions(+), 247 deletions(-) create mode 100644 sanic/tls.py create mode 100644 tests/certs/createcerts.py create mode 100644 tests/certs/invalid.certmissing/privkey.pem create mode 100644 tests/certs/localhost/fullchain.pem create mode 100644 tests/certs/localhost/privkey.pem create mode 100644 tests/certs/sanic.example/fullchain.pem create mode 100644 tests/certs/sanic.example/privkey.pem delete mode 100644 tests/certs/selfsigned.cert delete mode 100644 tests/certs/selfsigned.key create mode 100644 tests/test_tls.py diff --git a/.codeclimate.yml b/.codeclimate.yml index 4f75715c..947d6ad4 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -21,5 +21,7 @@ checks: config: threshold: 40 complex-logic: + enabled: false + method-complexity: config: threshold: 10 diff --git a/sanic/__main__.py b/sanic/__main__.py index 7903c728..928c0d73 100644 --- a/sanic/__main__.py +++ b/sanic/__main__.py @@ -4,7 +4,7 @@ import sys from argparse import ArgumentParser, RawTextHelpFormatter from importlib import import_module from pathlib import Path -from typing import Any, Dict, Optional +from typing import Union from sanic_routing import __version__ as __routing_version__ # type: ignore @@ -79,10 +79,30 @@ def main(): help="location of unix socket\n ", ) parser.add_argument( - "--cert", dest="cert", type=str, help="Location of certificate for SSL" + "--cert", + dest="cert", + type=str, + help="Location of fullchain.pem, bundle.crt or equivalent", ) parser.add_argument( - "--key", dest="key", type=str, help="location of keyfile for SSL\n " + "--key", + dest="key", + type=str, + help="Location of privkey.pem or equivalent .key file", + ) + parser.add_argument( + "--tls", + metavar="DIR", + type=str, + action="append", + help="TLS certificate folder with fullchain.pem and privkey.pem\n" + "May be specified multiple times to choose of multiple certificates", + ) + parser.add_argument( + "--tls-strict-host", + dest="tlshost", + action="store_true", + help="Only allow clients that send an SNI matching server certs\n ", ) parser.add_bool_arguments( "--access-logs", dest="access_log", help="display access logs" @@ -126,6 +146,26 @@ def main(): ) args = parser.parse_args() + # Custom TLS mismatch handling for better diagnostics + if ( + # one of cert/key missing + bool(args.cert) != bool(args.key) + # new and old style args used together + or args.tls + and args.cert + # strict host checking without certs would always fail + or args.tlshost + and not args.tls + and not args.cert + ): + parser.print_usage(sys.stderr) + error_logger.error( + "sanic: error: TLS certificates must be specified by either of:\n" + " --cert certdir/fullchain.pem --key certdir/privkey.pem\n" + " --tls certdir (equivalent to the above)" + ) + sys.exit(1) + try: module_path = os.path.abspath(os.getcwd()) if module_path not in sys.path: @@ -155,14 +195,18 @@ def main(): f"Perhaps you meant {args.module}.app?" ) + ssl: Union[None, dict, str, list] = [] + if args.tlshost: + ssl.append(None) if args.cert is not None or args.key is not None: - ssl: Optional[Dict[str, Any]] = { - "cert": args.cert, - "key": args.key, - } - else: + ssl.append(dict(cert=args.cert, key=args.key)) + if args.tls: + ssl += args.tls + if not ssl: ssl = None - + elif len(ssl) == 1 and ssl[0] is not None: + # Use only one cert, no TLSSelector. + ssl = ssl[0] kwargs = { "host": args.host, "port": args.port, diff --git a/sanic/app.py b/sanic/app.py index 15e87111..baee112e 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -19,7 +19,7 @@ from functools import partial from inspect import isawaitable from pathlib import Path from socket import socket -from ssl import Purpose, SSLContext, create_default_context +from ssl import SSLContext from traceback import format_exc from types import SimpleNamespace from typing import ( @@ -78,6 +78,7 @@ from sanic.server import serve, serve_multiple, serve_single from sanic.server.protocols.websocket_protocol import WebSocketProtocol from sanic.server.websockets.impl import ConnectionClosed from sanic.signals import Signal, SignalRouter +from sanic.tls import process_to_context from sanic.touchup import TouchUp, TouchUpMeta @@ -952,7 +953,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): *, debug: bool = False, auto_reload: Optional[bool] = None, - ssl: Union[Dict[str, str], SSLContext, None] = None, + ssl: Union[None, SSLContext, dict, str, list, tuple] = None, sock: Optional[socket] = None, workers: int = 1, protocol: Optional[Type[Protocol]] = None, @@ -979,7 +980,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): :type auto_relaod: bool :param ssl: SSLContext, or location of certificate and key for SSL encryption of worker(s) - :type ssl: SSLContext or dict + :type ssl: str, dict, SSLContext or list :param sock: Socket for the server to accept connections from :type sock: socket :param workers: Number of processes received before it is respected @@ -1089,7 +1090,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): port: Optional[int] = None, *, debug: bool = False, - ssl: Union[Dict[str, str], SSLContext, None] = None, + ssl: Union[None, SSLContext, dict, str, list, tuple] = None, sock: Optional[socket] = None, protocol: Type[Protocol] = None, backlog: int = 100, @@ -1281,16 +1282,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): auto_reload=False, ): """Helper function used by `run` and `create_server`.""" - - if isinstance(ssl, dict): - # try common aliaseses - cert = ssl.get("cert") or ssl.get("certificate") - key = ssl.get("key") or ssl.get("keyfile") - if cert is None or key is None: - raise ValueError("SSLContext or certificate and key required.") - context = create_default_context(purpose=Purpose.CLIENT_AUTH) - context.load_cert_chain(cert, keyfile=key) - ssl = context if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0: raise ValueError( "PROXIES_COUNT cannot be negative. " @@ -1300,6 +1291,35 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): self.error_handler.debug = debug self.debug = debug + if self.configure_logging and debug: + logger.setLevel(logging.DEBUG) + if ( + self.config.LOGO + and os.environ.get("SANIC_SERVER_RUNNING") != "true" + ): + logger.debug( + self.config.LOGO + if isinstance(self.config.LOGO, str) + else BASE_LOGO + ) + # Serve + if host and port: + proto = "http" + if ssl is not None: + proto = "https" + if unix: + logger.info(f"Goin' Fast @ {unix} {proto}://...") + else: + # colon(:) is legal for a host only in an ipv6 address + display_host = f"[{host}]" if ":" in host else host + logger.info(f"Goin' Fast @ {proto}://{display_host}:{port}") + + debug_mode = "enabled" if self.debug else "disabled" + reload_mode = "enabled" if auto_reload else "disabled" + logger.debug(f"Sanic auto-reload: {reload_mode}") + logger.debug(f"Sanic debug mode: {debug_mode}") + + ssl = process_to_context(ssl) server_settings = { "protocol": protocol, @@ -1328,39 +1348,9 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta): listeners = [partial(listener, self) for listener in listeners] server_settings[settings_name] = listeners - if self.configure_logging and debug: - logger.setLevel(logging.DEBUG) - - if ( - self.config.LOGO - and os.environ.get("SANIC_SERVER_RUNNING") != "true" - ): - logger.debug( - self.config.LOGO - if isinstance(self.config.LOGO, str) - else BASE_LOGO - ) - if run_async: server_settings["run_async"] = True - # Serve - if host and port: - proto = "http" - if ssl is not None: - proto = "https" - if unix: - logger.info(f"Goin' Fast @ {unix} {proto}://...") - else: - # colon(:) is legal for a host only in an ipv6 address - display_host = f"[{host}]" if ":" in host else host - logger.info(f"Goin' Fast @ {proto}://{display_host}:{port}") - - debug_mode = "enabled" if self.debug else "disabled" - reload_mode = "enabled" if auto_reload else "disabled" - logger.debug(f"Sanic auto-reload: {reload_mode}") - logger.debug(f"Sanic debug mode: {debug_mode}") - return server_settings def _build_endpoint_name(self, *parts): diff --git a/sanic/models/server_types.py b/sanic/models/server_types.py index f0ced247..ec9588bf 100644 --- a/sanic/models/server_types.py +++ b/sanic/models/server_types.py @@ -1,4 +1,6 @@ +from ssl import SSLObject from types import SimpleNamespace +from typing import Optional from sanic.models.protocol_types import TransportProtocol @@ -20,8 +22,10 @@ class ConnInfo: "peername", "server_port", "server", + "server_name", "sockname", "ssl", + "cert", ) def __init__(self, transport: TransportProtocol, unix=None): @@ -31,8 +35,16 @@ class ConnInfo: self.server_port = self.client_port = 0 self.client_ip = "" self.sockname = addr = transport.get_extra_info("sockname") - self.ssl: bool = bool(transport.get_extra_info("sslcontext")) - + self.ssl = False + self.server_name = "" + self.cert = {} + sslobj: Optional[SSLObject] = transport.get_extra_info( + "ssl_object" + ) # type: ignore + if sslobj: + self.ssl = True + self.server_name = getattr(sslobj, "sanic_server_name", None) or "" + self.cert = getattr(sslobj.context, "sanic", {}) if isinstance(addr, str): # UNIX socket self.server = unix or addr return diff --git a/sanic/tls.py b/sanic/tls.py new file mode 100644 index 00000000..d99b8f93 --- /dev/null +++ b/sanic/tls.py @@ -0,0 +1,196 @@ +import os +import ssl + +from typing import Iterable, Optional, Union + +from sanic.log import logger + + +# Only allow secure ciphers, notably leaving out AES-CBC mode +# OpenSSL chooses ECDSA or RSA depending on the cert in use +CIPHERS_TLS12 = [ + "ECDHE-ECDSA-CHACHA20-POLY1305", + "ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES128-GCM-SHA256", + "ECDHE-RSA-CHACHA20-POLY1305", + "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES128-GCM-SHA256", +] + + +def create_context( + certfile: Optional[str] = None, + keyfile: Optional[str] = None, + password: Optional[str] = None, +) -> ssl.SSLContext: + """Create a context with secure crypto and HTTP/1.1 in protocols.""" + context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.set_ciphers(":".join(CIPHERS_TLS12)) + context.set_alpn_protocols(["http/1.1"]) + context.sni_callback = server_name_callback + if certfile and keyfile: + context.load_cert_chain(certfile, keyfile, password) + return context + + +def shorthand_to_ctx( + ctxdef: Union[None, ssl.SSLContext, dict, str] +) -> Optional[ssl.SSLContext]: + """Convert an ssl argument shorthand to an SSLContext object.""" + if ctxdef is None or isinstance(ctxdef, ssl.SSLContext): + return ctxdef + if isinstance(ctxdef, str): + return load_cert_dir(ctxdef) + if isinstance(ctxdef, dict): + return CertSimple(**ctxdef) + raise ValueError( + f"Invalid ssl argument {type(ctxdef)}." + " Expecting a list of certdirs, a dict or an SSLContext." + ) + + +def process_to_context( + ssldef: Union[None, ssl.SSLContext, dict, str, list, tuple] +) -> Optional[ssl.SSLContext]: + """Process app.run ssl argument from easy formats to full SSLContext.""" + return ( + CertSelector(map(shorthand_to_ctx, ssldef)) + if isinstance(ssldef, (list, tuple)) + else shorthand_to_ctx(ssldef) + ) + + +def load_cert_dir(p: str) -> ssl.SSLContext: + if os.path.isfile(p): + raise ValueError(f"Certificate folder expected but {p} is a file.") + keyfile = os.path.join(p, "privkey.pem") + certfile = os.path.join(p, "fullchain.pem") + if not os.access(keyfile, os.R_OK): + raise ValueError( + f"Certificate not found or permission denied {keyfile}" + ) + if not os.access(certfile, os.R_OK): + raise ValueError( + f"Certificate not found or permission denied {certfile}" + ) + return CertSimple(certfile, keyfile) + + +class CertSimple(ssl.SSLContext): + """A wrapper for creating SSLContext with a sanic attribute.""" + + def __new__(cls, cert, key, **kw): + # try common aliases, rename to cert/key + certfile = kw["cert"] = kw.pop("certificate", None) or cert + keyfile = kw["key"] = kw.pop("keyfile", None) or key + password = kw.pop("password", None) + if not certfile or not keyfile: + raise ValueError("SSL dict needs filenames for cert and key.") + subject = {} + if "names" not in kw: + cert = ssl._ssl._test_decode_cert(certfile) # type: ignore + kw["names"] = [ + name + for t, name in cert["subjectAltName"] + if t in ["DNS", "IP Address"] + ] + subject = {k: v for item in cert["subject"] for k, v in item} + self = create_context(certfile, keyfile, password) + self.__class__ = cls + self.sanic = {**subject, **kw} + return self + + def __init__(self, cert, key, **kw): + pass # Do not call super().__init__ because it is already initialized + + +class CertSelector(ssl.SSLContext): + """Automatically select SSL certificate based on the hostname that the + client is trying to access, via SSL SNI. Paths to certificate folders + with privkey.pem and fullchain.pem in them should be provided, and + will be matched in the order given whenever there is a new connection. + """ + + def __new__(cls, ctxs): + return super().__new__(cls) + + def __init__(self, ctxs: Iterable[Optional[ssl.SSLContext]]): + super().__init__() + self.sni_callback = selector_sni_callback # type: ignore + self.sanic_select = [] + self.sanic_fallback = None + all_names = [] + for i, ctx in enumerate(ctxs): + if not ctx: + continue + names = getattr(ctx, "sanic", {}).get("names", []) + all_names += names + self.sanic_select.append(ctx) + if i == 0: + self.sanic_fallback = ctx + if not all_names: + raise ValueError( + "No certificates with SubjectAlternativeNames found." + ) + logger.info(f"Certificate vhosts: {', '.join(all_names)}") + + +def find_cert(self: CertSelector, server_name: str): + """Find the first certificate that matches the given SNI. + + :raises ssl.CertificateError: No matching certificate found. + :return: A matching ssl.SSLContext object if found.""" + if not server_name: + if self.sanic_fallback: + return self.sanic_fallback + raise ValueError( + "The client provided no SNI to match for certificate." + ) + for ctx in self.sanic_select: + if match_hostname(ctx, server_name): + return ctx + if self.sanic_fallback: + return self.sanic_fallback + raise ValueError(f"No certificate found matching hostname {server_name!r}") + + +def match_hostname( + ctx: Union[ssl.SSLContext, CertSelector], hostname: str +) -> bool: + """Match names from CertSelector against a received hostname.""" + # Local certs are considered trusted, so this can be less pedantic + # and thus faster than the deprecated ssl.match_hostname function is. + names = getattr(ctx, "sanic", {}).get("names", []) + hostname = hostname.lower() + for name in names: + if name.startswith("*."): + if hostname.split(".", 1)[-1] == name[2:]: + return True + elif name == hostname: + return True + return False + + +def selector_sni_callback( + sslobj: ssl.SSLObject, server_name: str, ctx: CertSelector +) -> Optional[int]: + """Select a certificate mathing the SNI.""" + # Call server_name_callback to store the SNI on sslobj + server_name_callback(sslobj, server_name, ctx) + # Find a new context matching the hostname + try: + sslobj.context = find_cert(ctx, server_name) + except ValueError as e: + logger.warning(f"Rejecting TLS connection: {e}") + # This would show ERR_SSL_UNRECOGNIZED_NAME_ALERT on client side if + # asyncio/uvloop did proper SSL shutdown. They don't. + return ssl.ALERT_DESCRIPTION_UNRECOGNIZED_NAME + return None # mypy complains without explicit return + + +def server_name_callback( + sslobj: ssl.SSLObject, server_name: str, ctx: ssl.SSLContext +) -> None: + """Store the received SNI as sslobj.sanic_server_name.""" + sslobj.sanic_server_name = server_name # type: ignore diff --git a/setup.py b/setup.py index ecbf1e07..2d7b9f0e 100644 --- a/setup.py +++ b/setup.py @@ -123,6 +123,7 @@ docs_require = [ ] dev_require = tests_require + [ + "cryptography", "tox", "towncrier", ] diff --git a/tests/certs/createcerts.py b/tests/certs/createcerts.py new file mode 100644 index 00000000..34415961 --- /dev/null +++ b/tests/certs/createcerts.py @@ -0,0 +1,113 @@ +from datetime import datetime, timedelta +from ipaddress import ip_address +from os import path + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, rsa +from cryptography.x509 import ( + BasicConstraints, + CertificateBuilder, + DNSName, + ExtendedKeyUsage, + IPAddress, + KeyUsage, + Name, + NameAttribute, + SubjectAlternativeName, + random_serial_number, +) +from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID + + +def writefiles(key, cert): + cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + folder = path.join(path.dirname(__file__), cn) + with open(path.join(folder, "fullchain.pem"), "wb") as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + + with open(path.join(folder, "privkey.pem"), "wb") as f: + f.write( + key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption(), + ) + ) + + +def selfsigned(key, common_name, san): + subject = issuer = Name( + [ + NameAttribute(NameOID.COMMON_NAME, common_name), + NameAttribute(NameOID.ORGANIZATION_NAME, "Sanic Org"), + ] + ) + cert = ( + CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(random_serial_number()) + .not_valid_before(datetime.utcnow()) + .not_valid_after(datetime.utcnow() + timedelta(days=365.25 * 8)) + .add_extension( + KeyUsage( + True, False, False, False, False, False, False, False, False + ), + critical=True, + ) + .add_extension( + ExtendedKeyUsage( + [ + ExtendedKeyUsageOID.SERVER_AUTH, + ExtendedKeyUsageOID.CLIENT_AUTH, + ] + ), + critical=False, + ) + .add_extension( + BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .add_extension( + SubjectAlternativeName( + [ + IPAddress(ip_address(n)) + if n[0].isdigit() or ":" in n + else DNSName(n) + for n in san + ] + ), + critical=False, + ) + .sign(key, hashes.SHA256()) + ) + return cert + + +# Sanic example/test self-signed cert RSA +key = rsa.generate_private_key(public_exponent=65537, key_size=2048) +cert = selfsigned( + key, + "sanic.example", + [ + "sanic.example", + "www.sanic.example", + "*.sanic.test", + "2001:db8::541c", + ], +) +writefiles(key, cert) + +# Sanic localhost self-signed cert ECDSA +key = ec.generate_private_key(ec.SECP256R1) +cert = selfsigned( + key, + "localhost", + [ + "localhost", + "127.0.0.1", + "::1", + ], +) +writefiles(key, cert) diff --git a/tests/certs/invalid.certmissing/privkey.pem b/tests/certs/invalid.certmissing/privkey.pem new file mode 100644 index 00000000..5caf94e1 --- /dev/null +++ b/tests/certs/invalid.certmissing/privkey.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIFP3fCUob41U1wvVOvei4dGsXrZeSiBUCX/xVu9215bvoAoGCCqGSM49 +AwEHoUQDQgAEvBHo/RatEnPRBeiLURXX2sQDBbr9XRb73Fvm8jIOrPyJg8PcvNXH +D1jQah5K60THdjmdkLsY/hamZfqLb24EFQ== +-----END EC PRIVATE KEY----- diff --git a/tests/certs/localhost/fullchain.pem b/tests/certs/localhost/fullchain.pem new file mode 100644 index 00000000..532343ac --- /dev/null +++ b/tests/certs/localhost/fullchain.pem @@ -0,0 +1,12 @@ +-----BEGIN CERTIFICATE----- +MIIBwjCCAWigAwIBAgIUQOCJIPRMiZsOMmvH0uiofxEDFn8wCgYIKoZIzj0EAwIw +KDESMBAGA1UEAwwJbG9jYWxob3N0MRIwEAYDVQQKDAlTYW5pYyBPcmcwHhcNMjEx +MDE5MTcwMTE3WhcNMjkxMDE5MTcwMTE3WjAoMRIwEAYDVQQDDAlsb2NhbGhvc3Qx +EjAQBgNVBAoMCVNhbmljIE9yZzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHf0 +SrvRtGF9KIXEtk4+6vsqleNaleuYVvf4d6TD3pX1CbOV/NsZdW6+EhkA1U2pEBnJ +txXqAGVJT4ans8ud3K6jcDBuMA4GA1UdDwEB/wQEAwIHgDAdBgNVHSUEFjAUBggr +BgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB/zAsBgNVHREEJTAjggls +b2NhbGhvc3SHBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwCgYIKoZIzj0EAwIDSAAw +RQIhAJhwopVuiW0S4MKEDCl+Vxwyei5AYobrALcP0pwGpFzIAiAWkxMPeAOMWIjq +LD4t2UZ9h6ma2fS2Jf9pzTon6438Ng== +-----END CERTIFICATE----- diff --git a/tests/certs/localhost/privkey.pem b/tests/certs/localhost/privkey.pem new file mode 100644 index 00000000..b1e2cef5 --- /dev/null +++ b/tests/certs/localhost/privkey.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIDKTs1c2Qo7KMQ8DJrmIuNb29z2fNi4O+TNkJWjvclvsoAoGCCqGSM49 +AwEHoUQDQgAEd/RKu9G0YX0ohcS2Tj7q+yqV41qV65hW9/h3pMPelfUJs5X82xl1 +br4SGQDVTakQGcm3FeoAZUlPhqezy53crg== +-----END EC PRIVATE KEY----- diff --git a/tests/certs/sanic.example/fullchain.pem b/tests/certs/sanic.example/fullchain.pem new file mode 100644 index 00000000..abe6089e --- /dev/null +++ b/tests/certs/sanic.example/fullchain.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDdzCCAl+gAwIBAgIUF1H0To9k3mUiMT8mjF6g45A9KgcwDQYJKoZIhvcNAQEL +BQAwLDEWMBQGA1UEAwwNc2FuaWMuZXhhbXBsZTESMBAGA1UECgwJU2FuaWMgT3Jn +MB4XDTIxMTAxOTE3MDExN1oXDTI5MTAxOTE3MDExN1owLDEWMBQGA1UEAwwNc2Fu +aWMuZXhhbXBsZTESMBAGA1UECgwJU2FuaWMgT3JnMIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAzNeC95zB5LRybz9Wl16+Q4kbOLgXlyUQVKhg9OZD1ChN +3T4Ya/KvChQmPWOWdF814NgkkNS1yHKXlORU2Ljbpqzr+WoOAwGVixbRTknjmI46 +glUhCOJlGqxl16RfuYA2BWv0+At9jKBhT1tnrGVhfqldnxsb4FDh0JsFnrZN4/DB +z6x8PY1z0eQMgsyeKAfSTTnGXhkZzAQz6afuQbGZhe8vQIUTvwmnZiU9OdUZ6nLc +b7lSbIQ1edT6/xXUkbn5ixGsEQTf6JWLqEDLkqpo9sbkYmvMMQpj2pCgtaEjx7An ++hQe8Itv+i6h0KD3ARVeCBgdWEgXZTs7zKrmU77xfwIDAQABo4GQMIGNMA4GA1Ud +DwEB/wQEAwIHgDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0T +AQH/BAUwAwEB/zBLBgNVHREERDBCgg1zYW5pYy5leGFtcGxlghF3d3cuc2FuaWMu +ZXhhbXBsZYIMKi5zYW5pYy50ZXN0hxAgAQ24AAAAAAAAAAAAAFQcMA0GCSqGSIb3 +DQEBCwUAA4IBAQBLV7xSEI7308Qmm3SyV+ro9jQ/i2ydwUIUyRMtf04EFRS8fHK/ +Lln5Yweaba9XP5k3DLSC63Qg1tE50fVqQypbWVA4SMkMW21cK8vEhHEYeGYkHsuC +xCFdwJYhmofqWaQ/j/ErLBrQbaHBdSJ/Nou5RPRtM4HrSU7F2azLGmLczYk6PcZa +wSBvoXdjiEUrRl7XB0iB2ktTga6amuYz4bSJzUvaA8SodJzC4OKhRsduUD83LdDi +2As4KiTcSO/SOCaK2KmbPNBlTKMF4cpqysGMvmnGVWhECOG1PZItJkWNbbBV4XRR +qGmrey2JwDDeTYHFDHaND385/PSJKfSSGLNk +-----END CERTIFICATE----- diff --git a/tests/certs/sanic.example/privkey.pem b/tests/certs/sanic.example/privkey.pem new file mode 100644 index 00000000..b40fee74 --- /dev/null +++ b/tests/certs/sanic.example/privkey.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAzNeC95zB5LRybz9Wl16+Q4kbOLgXlyUQVKhg9OZD1ChN3T4Y +a/KvChQmPWOWdF814NgkkNS1yHKXlORU2Ljbpqzr+WoOAwGVixbRTknjmI46glUh +COJlGqxl16RfuYA2BWv0+At9jKBhT1tnrGVhfqldnxsb4FDh0JsFnrZN4/DBz6x8 +PY1z0eQMgsyeKAfSTTnGXhkZzAQz6afuQbGZhe8vQIUTvwmnZiU9OdUZ6nLcb7lS +bIQ1edT6/xXUkbn5ixGsEQTf6JWLqEDLkqpo9sbkYmvMMQpj2pCgtaEjx7An+hQe +8Itv+i6h0KD3ARVeCBgdWEgXZTs7zKrmU77xfwIDAQABAoIBABWKpG89wPY4M8CX +PJf2krOve3lfgruWXj1I58lZXdC13Fpj6VWQ0++PZuYVzwC18oiOsmm4tNU7l81E +pdeUuSSyEq7MBGU0iXFzGNfO1Wx5qJWENlEk3dUMRDmFQ7vSS9wOGljrfGyJgTJD +PofWsYYMcZgF1cylNNonM1QZf990hfd0JDfO6CHCloRe/pKIdVzIxQp+3Ju/3OPk +Gw5V+YnVrG4wdZbhOCW2hPp/TLdgFy/xHvrxkEkGx+2ZHGCw9uFj2LRZJwwuaO9p +LDzbyfbFlPWIHdPamdBvenZ6RNTf28+YsbiqwoOk5C286QYb/VDnT8UnG42hXS1I +p3m//qECgYEA7zXmMSBy1tkMQsuaAakOFfl2HfVL2rrW6/CH6BwcCHUD6Wr8wv6a +kPNhI6pqqnP6Xg8XqJXfyIVZOJYPQMQr69zni2y7b3jPOemVGTBSqN7UE71NZkHF ++HZov55bPuX/KD6qc/WAXCyEcISy9TmcA7cEN7ivmyXmbuSXEoiAjlsCgYEA2zgU +mzL6ObJ2555UOqzGCMx6o2KQqOgA1SGmYLBRX77I3fuvGj+DLo6/iuM0FcVV7alG +U/U6qqrSymtdRgeZXHziSVhLZKY/qobgKG2iO1F3DzqyZ94EK/v0XRS4UyiJma3f +lwVG/BcVnv+FKCYUo2JKGln0R8Wcm6D9Nxp0mq0CgYEAn0Dj+oreyZiAqCuCYV6a +SRjmgTVghcNj+HoPEQE9zIeSziBzHKKCZsQRRLxc/RPveBVWK99zt7zHVHvatcSk +dQeBg3olIyZr1+NhZv6b2V9YE7gwwkZBtZOnUwLrPmnCwJlPw5mLFlJw7bP6rHXp +HzQF887Z4lGOIv++cBE+fQcCgYEArF26BhXdHcSvLYsWW1RCGeT9gL4dVFGnZe2h +bmD0er3+Hlyo35CUyuS+wqvG5l9VIxt4CsfFKzBJsZMdsdSDx28CVf0wuqDlamXG +lsMtTkrNvJHAeV7eFN900kNaczhqiQVnys0BdXGJNI1g26Klk5nS/klAg7ZjXxME +RnFswbkCgYBG5OToLXM8pg3yTM9MHMSXFhnnd2MbBK2AySFah2P1V4xv1rJdklU0 +9QRTd/hQmYGHioPIF9deU8YSWlj+FBimyoNfJ51YzFyp2maOSJq4Wxe1nv2DflRK +gh5pkl8FizoDnu8BHu1AjOfRQJ3/tCIi2XZJgBuCxyTjd1b6hVUhyg== +-----END RSA PRIVATE KEY----- diff --git a/tests/certs/selfsigned.cert b/tests/certs/selfsigned.cert deleted file mode 100644 index 0dc7b914..00000000 --- a/tests/certs/selfsigned.cert +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDtTCCAp2gAwIBAgIJAO6wb0FSc/rNMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV -BAYTAlVTMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX -aWRnaXRzIFB0eSBMdGQwHhcNMTcwMzAzMTUyODAzWhcNMTkxMTI4MTUyODAzWjBF -MQswCQYDVQQGEwJVUzETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 -ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB -CgKCAQEAsy7Zb3p4yCEnUtPLwqeJrwj9u/ZmcFCrMAktFBx9hG6rY2r7mdB6Bflh -V5cUJXxnsNiDpYcxGhA8kry7pEork1vZ05DyZC9ulVlvxBouVShBcLLwdpaoTGqE -vYtejv6x7ogwMXOjkWWb1WpOv4CVhpeXJ7O/d1uAiYgcUpTpPp4ONG49IAouBHq3 -h+o4nVvNfB0J8gaCtTsTZqi1Wt8WYs3XjxGJaKh//ealfRe1kuv40CWQ8gjaC8/1 -w9pHdom3Wi/RwfDM3+dVGV6M5lAbPXMB4RK17Hk9P3hlJxJOpKBdgcBJPXtNrTwf -qEWWxk2mB/YVyB84AxjkkNoYyi2ggQIDAQABo4GnMIGkMB0GA1UdDgQWBBRa46Ix -9s9tmMqu+Zz1mocHghm4NTB1BgNVHSMEbjBsgBRa46Ix9s9tmMqu+Zz1mocHghm4 -NaFJpEcwRTELMAkGA1UEBhMCVVMxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNV -BAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAO6wb0FSc/rNMAwGA1UdEwQF -MAMBAf8wDQYJKoZIhvcNAQELBQADggEBACdrnM8zb7abxAJsU5WLn1IR0f2+EFA7 -ezBEJBM4bn0IZrXuP5ThZ2wieJlshG0C16XN9+zifavHci+AtQwWsB0f/ppHdvWQ -7wt7JN88w+j0DNIYEadRCjWxR3gRAXPgKu3sdyScKFq8MvB49A2EdXRmQSTIM6Fj -teRbE+poxewFT0mhurf3xrtGiSALmv7uAzhRDqpYUzcUlbOGgkyFLYAOOdvZvei+ -mfXDi4HKYxgyv53JxBARMdajnCHXM7zQ6Tjc8j1HRtmDQ3XapUB559KfxfODGQq5 -zmeoZWU4duxcNXJM0Eiz1CJ39JoWwi8sqaGi/oskuyAh7YKyVTn8xa8= ------END CERTIFICATE----- diff --git a/tests/certs/selfsigned.key b/tests/certs/selfsigned.key deleted file mode 100644 index 504ef7da..00000000 --- a/tests/certs/selfsigned.key +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEAsy7Zb3p4yCEnUtPLwqeJrwj9u/ZmcFCrMAktFBx9hG6rY2r7 -mdB6BflhV5cUJXxnsNiDpYcxGhA8kry7pEork1vZ05DyZC9ulVlvxBouVShBcLLw -dpaoTGqEvYtejv6x7ogwMXOjkWWb1WpOv4CVhpeXJ7O/d1uAiYgcUpTpPp4ONG49 -IAouBHq3h+o4nVvNfB0J8gaCtTsTZqi1Wt8WYs3XjxGJaKh//ealfRe1kuv40CWQ -8gjaC8/1w9pHdom3Wi/RwfDM3+dVGV6M5lAbPXMB4RK17Hk9P3hlJxJOpKBdgcBJ -PXtNrTwfqEWWxk2mB/YVyB84AxjkkNoYyi2ggQIDAQABAoIBAFgVasxTf3aaXbNo -7JzXMWb7W4iAG2GRNmZZzHA7hTSKFvS7jc3SX3n6WvDtEvlOi8ay2RyRNgEjBDP6 -VZ/w2jUJjS5k7dN0Qb9nhPr5B9fS/0CAppcVfsx5/KEVFzniWOPyzQYyW7FJKu8h -4G5hrp/Ie4UH5tKtB6YUZB/wliyyQUkAZdBcoy1hfkOZLAXb1oofArKsiQUHIRA5 -th1yyS4cZP8Upngd1EE+d95dFHM2F6iI2lj6DHuu+JxUZ+wKXoNimdG7JniRtIf4 -56GoDov83Ey+XbIS6FSQc9nY0ijBDcubl/yP3roCQpE+MZ9BNEo5uj7YmCtAMYLW -TXTNBGUCgYEA4wdkH1NLdub2NcpqwmSA0AtbRvDkt0XTDWWwmuMr/+xPVa4sUKHs -80THQEX/WAZroP6IPbMP6BJhzb53vECukgC65qPxu6M9D1lBGtglxgen4AMu1bKK -gnM8onwARGIo/2ay6qRRZZCxg0TvBky3hbTcIM2zVrnKU6VVyGKHSV8CgYEAygxs -WQYrACv3XN6ZEzyxy08JgjbcnkPWK/m3VPcyHgdEkDu8+nDdUVdbF/js2JWMMx5g -vrPhZ7jVLOXGcLr5mVU4dG5tW5lU0bMy+YYxpEQDiBKlpXgfOsQnakHj7cCZ6bay -mKjJck2oEAQS9bqOJN/Ts5vhOmc8rmhkO7hnAh8CgYEArhVDy9Vl/1WYo6SD+m1w -bJbYtewPpQzwicxZAFuDqKk+KDf3GRkhBWTO2FUUOB4sN3YVaCI+5zf5MPeE/qAm -fCP9LM+3k6bXMkbBamEljdTfACHQruJJ3T+Z1gn5dnZCc5z/QncfRx8NTtfz5MO8 -0dTeGnVAuBacs0kLHy2WCUcCgYALNBkl7pOf1NBIlAdE686oCV/rmoMtO3G6yoQB -8BsVUy3YGZfnAy8ifYeNkr3/XHuDsiGHMY5EJBmd/be9NID2oaUZv63MsHnljtw6 -vdgu1Z6kgvQwcrK4nXvaBoFPA6kFLp5EnMde0TOKf89VVNzg6pBgmzon9OWGfj9g -mF8N3QKBgQCeoLwxUxpzEA0CPHm7DWF0LefVGllgZ23Eqncdy0QRku5zwwibszbL -sWaR3uDCc3oYcbSGCDVx3cSkvMAJNalc5ZHPfoV9W0+v392/rrExo5iwD8CSoCb2 -gFWkeR7PBrD3NzFzFAWyiudzhBKHfRsB0MpCXbJV/WLqTlGIbEypjg== ------END RSA PRIVATE KEY----- diff --git a/tests/test_cli.py b/tests/test_cli.py index 4e386c52..6112d1ed 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -45,6 +45,58 @@ def test_server_run(appname): assert firstline == b"Goin' Fast @ http://127.0.0.1:8000" +@pytest.mark.parametrize( + "cmd", + ( + ( + "--cert=certs/sanic.example/fullchain.pem", + "--key=certs/sanic.example/privkey.pem", + ), + ( + "--tls=certs/sanic.example/", + "--tls=certs/localhost/", + ), + ( + "--tls=certs/sanic.example/", + "--tls=certs/localhost/", + "--tls-strict-host", + ), + ), +) +def test_tls_options(cmd): + command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"] + out, err, exitcode = capture(command) + assert exitcode != 1 + lines = out.split(b"\n") + firstline = lines[6] + assert firstline == b"Goin' Fast @ https://127.0.0.1:9999" + + +@pytest.mark.parametrize( + "cmd", + ( + ( + "--cert=certs/sanic.example/fullchain.pem", + ), + ( + "--cert=certs/sanic.example/fullchain.pem", + "--key=certs/sanic.example/privkey.pem", + "--tls=certs/localhost/", + ), + ( + "--tls-strict-host", + ), + ), +) +def test_tls_wrong_options(cmd): + command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"] + out, err, exitcode = capture(command) + assert exitcode == 1 + assert not out + errmsg = err.decode().split("sanic: error: ")[1].split("\n")[0] + assert errmsg == "TLS certificates must be specified by either of:" + + @pytest.mark.parametrize( "cmd", ( diff --git a/tests/test_logging.py b/tests/test_logging.py index 639bb2ee..c475b00b 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,6 +1,4 @@ import logging -import os -import sys import uuid from importlib import reload @@ -9,12 +7,9 @@ from unittest.mock import Mock import pytest -from sanic_testing.testing import SanicTestClient - import sanic from sanic import Sanic -from sanic.compat import OS_IS_WINDOWS from sanic.log import LOGGING_CONFIG_DEFAULTS, logger from sanic.response import text @@ -155,56 +150,6 @@ async def test_logger(caplog): assert record in caplog.record_tuples -@pytest.mark.skipif( - OS_IS_WINDOWS and sys.version_info >= (3, 8), - reason="Not testable with current client", -) -def test_logger_static_and_secure(caplog): - # Same as test_logger, except for more coverage: - # - test_client initialised separately for static port - # - using ssl - rand_string = str(uuid.uuid4()) - - app = Sanic(name=__name__) - - @app.get("/") - def log_info(request): - logger.info(rand_string) - return text("hello") - - current_dir = os.path.dirname(os.path.realpath(__file__)) - ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert") - ssl_key = os.path.join(current_dir, "certs/selfsigned.key") - - ssl_dict = {"cert": ssl_cert, "key": ssl_key} - - test_client = SanicTestClient(app, port=42101) - with caplog.at_level(logging.INFO): - request, response = test_client.get( - f"https://127.0.0.1:{test_client.port}/", - server_kwargs=dict(ssl=ssl_dict), - ) - - port = test_client.port - - assert caplog.record_tuples[0] == ( - "sanic.root", - logging.INFO, - f"Goin' Fast @ https://127.0.0.1:{port}", - ) - assert caplog.record_tuples[1] == ( - "sanic.root", - logging.INFO, - f"https://127.0.0.1:{port}/", - ) - assert caplog.record_tuples[2] == ("sanic.root", logging.INFO, rand_string) - assert caplog.record_tuples[-1] == ( - "sanic.root", - logging.INFO, - "Server Stopped", - ) - - def test_logging_modified_root_logger_config(): # reset_logging() diff --git a/tests/test_requests.py b/tests/test_requests.py index 35b2c900..e5db9d20 100644 --- a/tests/test_requests.py +++ b/tests/test_requests.py @@ -1,6 +1,4 @@ import logging -import os -import ssl from json import dumps as json_dumps from json import loads as json_loads @@ -1119,92 +1117,6 @@ async def test_url_attributes_no_ssl_asgi(app, path, query, expected_url): assert parsed.netloc == request.host -@pytest.mark.parametrize( - "path,query,expected_url", - [ - ("/foo", "", "https://{}:{}/foo"), - ("/bar/baz", "", "https://{}:{}/bar/baz"), - ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"), - ], -) -def test_url_attributes_with_ssl_context(app, path, query, expected_url): - current_dir = os.path.dirname(os.path.realpath(__file__)) - context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) - context.load_cert_chain( - os.path.join(current_dir, "certs/selfsigned.cert"), - keyfile=os.path.join(current_dir, "certs/selfsigned.key"), - ) - - async def handler(request): - return text("OK") - - app.add_route(handler, path) - - port = app.test_client.port - request, response = app.test_client.get( - f"https://{HOST}:{PORT}" + path + f"?{query}", - server_kwargs={"ssl": context}, - ) - assert request.url == expected_url.format(HOST, request.server_port) - - parsed = urlparse(request.url) - - assert parsed.scheme == request.scheme - assert parsed.path == request.path - assert parsed.query == request.query_string - assert parsed.netloc == request.host - - -@pytest.mark.parametrize( - "path,query,expected_url", - [ - ("/foo", "", "https://{}:{}/foo"), - ("/bar/baz", "", "https://{}:{}/bar/baz"), - ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"), - ], -) -def test_url_attributes_with_ssl_dict(app, path, query, expected_url): - - current_dir = os.path.dirname(os.path.realpath(__file__)) - ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert") - ssl_key = os.path.join(current_dir, "certs/selfsigned.key") - - ssl_dict = {"cert": ssl_cert, "key": ssl_key} - - async def handler(request): - return text("OK") - - app.add_route(handler, path) - - request, response = app.test_client.get( - f"https://{HOST}:{PORT}" + path + f"?{query}", - server_kwargs={"ssl": ssl_dict}, - ) - assert request.url == expected_url.format(HOST, request.server_port) - - parsed = urlparse(request.url) - - assert parsed.scheme == request.scheme - assert parsed.path == request.path - assert parsed.query == request.query_string - assert parsed.netloc == request.host - - -def test_invalid_ssl_dict(app): - @app.get("/test") - async def handler(request): - return text("ssl test") - - ssl_dict = {"cert": None, "key": None} - - with pytest.raises(ValueError) as excinfo: - request, response = app.test_client.get( - "/test", server_kwargs={"ssl": ssl_dict} - ) - - assert str(excinfo.value) == "SSLContext or certificate and key required." - - def test_form_with_multiple_values(app): @app.route("/", methods=["POST"]) async def handler(request): diff --git a/tests/test_tls.py b/tests/test_tls.py new file mode 100644 index 00000000..b0674be4 --- /dev/null +++ b/tests/test_tls.py @@ -0,0 +1,378 @@ +import logging +import os +import ssl +import uuid + +from contextlib import contextmanager +from urllib.parse import urlparse + +import pytest + +from sanic_testing.testing import HOST, PORT, SanicTestClient + +from sanic import Sanic +from sanic.compat import OS_IS_WINDOWS +from sanic.log import logger +from sanic.response import text + + +current_dir = os.path.dirname(os.path.realpath(__file__)) +localhost_dir = os.path.join(current_dir, "certs/localhost") +sanic_dir = os.path.join(current_dir, "certs/sanic.example") +invalid_dir = os.path.join(current_dir, "certs/invalid.nonexist") +localhost_cert = os.path.join(localhost_dir, "fullchain.pem") +localhost_key = os.path.join(localhost_dir, "privkey.pem") +sanic_cert = os.path.join(sanic_dir, "fullchain.pem") +sanic_key = os.path.join(sanic_dir, "privkey.pem") + + +@contextmanager +def replace_server_name(hostname): + """Temporarily replace the server name sent with all TLS requests with a fake hostname.""" + + def hack_wrap_bio( + self, + incoming, + outgoing, + server_side=False, + server_hostname=None, + session=None, + ): + return orig_wrap_bio( + self, incoming, outgoing, server_side, hostname, session + ) + + orig_wrap_bio, ssl.SSLContext.wrap_bio = ( + ssl.SSLContext.wrap_bio, + hack_wrap_bio, + ) + try: + yield + finally: + ssl.SSLContext.wrap_bio = orig_wrap_bio + + +@pytest.mark.parametrize( + "path,query,expected_url", + [ + ("/foo", "", "https://{}:{}/foo"), + ("/bar/baz", "", "https://{}:{}/bar/baz"), + ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"), + ], +) +def test_url_attributes_with_ssl_context(app, path, query, expected_url): + context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) + context.load_cert_chain(localhost_cert, localhost_key) + + async def handler(request): + return text("OK") + + app.add_route(handler, path) + + port = app.test_client.port + request, response = app.test_client.get( + f"https://{HOST}:{PORT}" + path + f"?{query}", + server_kwargs={"ssl": context}, + ) + assert request.url == expected_url.format(HOST, request.server_port) + + parsed = urlparse(request.url) + + assert parsed.scheme == request.scheme + assert parsed.path == request.path + assert parsed.query == request.query_string + assert parsed.netloc == request.host + + +@pytest.mark.parametrize( + "path,query,expected_url", + [ + ("/foo", "", "https://{}:{}/foo"), + ("/bar/baz", "", "https://{}:{}/bar/baz"), + ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"), + ], +) +def test_url_attributes_with_ssl_dict(app, path, query, expected_url): + ssl_dict = {"cert": localhost_cert, "key": localhost_key} + + async def handler(request): + return text("OK") + + app.add_route(handler, path) + + request, response = app.test_client.get( + f"https://{HOST}:{PORT}" + path + f"?{query}", + server_kwargs={"ssl": ssl_dict}, + ) + assert request.url == expected_url.format(HOST, request.server_port) + + parsed = urlparse(request.url) + + assert parsed.scheme == request.scheme + assert parsed.path == request.path + assert parsed.query == request.query_string + assert parsed.netloc == request.host + + +def test_cert_sni_single(app): + @app.get("/sni") + async def handler(request): + return text(request.conn_info.server_name) + + @app.get("/commonname") + async def handler(request): + return text(request.conn_info.cert.get("commonName")) + + port = app.test_client.port + request, response = app.test_client.get( + f"https://localhost:{port}/sni", + server_kwargs={"ssl": localhost_dir}, + ) + assert response.status == 200 + assert response.text == "localhost" + + request, response = app.test_client.get( + f"https://localhost:{port}/commonname", + server_kwargs={"ssl": localhost_dir}, + ) + assert response.status == 200 + assert response.text == "localhost" + + +def test_cert_sni_list(app): + ssl_list = [sanic_dir, localhost_dir] + + @app.get("/sni") + async def handler(request): + return text(request.conn_info.server_name) + + @app.get("/commonname") + async def handler(request): + return text(request.conn_info.cert.get("commonName")) + + # This test should match the localhost cert + port = app.test_client.port + request, response = app.test_client.get( + f"https://localhost:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "localhost" + + request, response = app.test_client.get( + f"https://localhost:{port}/commonname", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "localhost" + + # This part should use the sanic.example cert because it matches + with replace_server_name("www.sanic.example"): + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "www.sanic.example" + + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/commonname", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "sanic.example" + + # This part should use the sanic.example cert, that being the first listed + with replace_server_name("invalid.test"): + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "invalid.test" + + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/commonname", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "sanic.example" + + +def test_missing_sni(app): + """The sanic cert does not list 127.0.0.1 and httpx does not send IP as SNI anyway.""" + ssl_list = [None, sanic_dir] + + @app.get("/sni") + async def handler(request): + return text(request.conn_info.server_name) + + port = app.test_client.port + with pytest.raises(Exception) as exc: + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert "Request and response object expected" in str(exc.value) + + +def test_no_matching_cert(app): + """The sanic cert does not list 127.0.0.1 and httpx does not send IP as SNI anyway.""" + ssl_list = [None, sanic_dir] + + @app.get("/sni") + async def handler(request): + return text(request.conn_info.server_name) + + port = app.test_client.port + with replace_server_name("invalid.test"): + with pytest.raises(Exception) as exc: + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert "Request and response object expected" in str(exc.value) + + +def test_wildcards(app): + ssl_list = [None, localhost_dir, sanic_dir] + + @app.get("/sni") + async def handler(request): + return text(request.conn_info.server_name) + + port = app.test_client.port + + with replace_server_name("foo.sanic.test"): + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert response.status == 200 + assert response.text == "foo.sanic.test" + + with replace_server_name("sanic.test"): + with pytest.raises(Exception) as exc: + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert "Request and response object expected" in str(exc.value) + with replace_server_name("sub.foo.sanic.test"): + with pytest.raises(Exception) as exc: + request, response = app.test_client.get( + f"https://127.0.0.1:{port}/sni", + server_kwargs={"ssl": ssl_list}, + ) + assert "Request and response object expected" in str(exc.value) + + +def test_invalid_ssl_dict(app): + @app.get("/test") + async def handler(request): + return text("ssl test") + + ssl_dict = {"cert": None, "key": None} + + with pytest.raises(ValueError) as excinfo: + request, response = app.test_client.get( + "/test", server_kwargs={"ssl": ssl_dict} + ) + + assert str(excinfo.value) == "SSL dict needs filenames for cert and key." + + +def test_invalid_ssl_type(app): + @app.get("/test") + async def handler(request): + return text("ssl test") + + with pytest.raises(ValueError) as excinfo: + request, response = app.test_client.get( + "/test", server_kwargs={"ssl": False} + ) + + assert "Invalid ssl argument" in str(excinfo.value) + + +def test_cert_file_on_pathlist(app): + @app.get("/test") + async def handler(request): + return text("ssl test") + + ssl_list = [sanic_cert] + + with pytest.raises(ValueError) as excinfo: + request, response = app.test_client.get( + "/test", server_kwargs={"ssl": ssl_list} + ) + + assert "folder expected" in str(excinfo.value) + assert sanic_cert in str(excinfo.value) + + +def test_missing_cert_path(app): + @app.get("/test") + async def handler(request): + return text("ssl test") + + ssl_list = [invalid_dir] + + with pytest.raises(ValueError) as excinfo: + request, response = app.test_client.get( + "/test", server_kwargs={"ssl": ssl_list} + ) + + assert "not found" in str(excinfo.value) + assert invalid_dir + "/privkey.pem" in str(excinfo.value) + + +def test_missing_cert_file(app): + @app.get("/test") + async def handler(request): + return text("ssl test") + + invalid2 = invalid_dir.replace("nonexist", "certmissing") + ssl_list = [invalid2] + + with pytest.raises(ValueError) as excinfo: + request, response = app.test_client.get( + "/test", server_kwargs={"ssl": ssl_list} + ) + + assert "not found" in str(excinfo.value) + assert invalid2 + "/fullchain.pem" in str(excinfo.value) + + +def test_no_certs_on_list(app): + @app.get("/test") + async def handler(request): + return text("ssl test") + + ssl_list = [None] + + with pytest.raises(ValueError) as excinfo: + request, response = app.test_client.get( + "/test", server_kwargs={"ssl": ssl_list} + ) + + assert "No certificates" in str(excinfo.value) + + +def test_logger_vhosts(caplog): + app = Sanic(name=__name__) + + @app.after_server_start + def stop(*args): + app.stop() + + with caplog.at_level(logging.INFO): + app.run(host="127.0.0.1", port=42102, ssl=[localhost_dir, sanic_dir]) + + logmsg = [ + m for s, l, m in caplog.record_tuples if m.startswith("Certificate") + ][0] + + assert logmsg == ( + "Certificate vhosts: localhost, 127.0.0.1, 0:0:0:0:0:0:0:1, sanic.example, www.sanic.example, *.sanic.test, 2001:DB8:0:0:0:0:0:541C" + )