Vhost support using multiple TLS certificates (#2270)

* Initial support for using multiple SSL certificates.

* Also list IP address subjectAltNames on log.

* Use Python 3.7+ way of specifying TLSv1.2 as the minimum version. Linter fixes.

* isort

* Cleanup, store server name for later use. Add RSA ciphers. Log rejected SNIs.

* Cleanup, linter.

* Alter the order of initial log messages and handling. In particular, enable debug mode early so that debug messages during init can be shown.

* Store server name (SNI) to conn_info.

* Update test with new error message.

* Refactor for readability.

* Cleanup

* Replace old expired test cert with new ones and a script for regenerating them as needed.

* Refactor TLS tests to a separate file.

* Add cryptography to dev deps for rebuilding TLS certs.

* Minor adjustment to messages.

* Tests added for new TLS code.

* Find the correct log row before testing for message. The order was different on CI.

* More log message order fixup. The tests do not account for the logo being printed first.

* Another attempt at log message indexing fixup.

* Major TLS refactoring.

CertSelector now allows dicts and SSLContext within its list.
Server names are stored even when no list is used.
SSLContext.sanic now contains a dict with any setting passed and information extracted from cert.
That information is available on request.conn_info.cert.
Type annotations added.
More tests incl. a handler for faking hostname in tests.

* Remove a problematic logger test that apparently was not adding any coverage or value to anything.

* Revert accidental commit of uvloop disable.

* Typing fixes / refactoring.

* Additional test for cert selection. Certs recreated without DNS:localhost on sanic.example cert.

* Add tests for single certificate path shorthand and SNI information.

* Move TLS dict processing to CertSimple, make the names field optional and use names from the cert if absent.

* Sanic CLI options --tls and --tls-strict-host to use the new features.

* SSL argument typing updated

* Use ValueError for internal message passing to avoid CertificateError's odd message formatting.

* Linter

* Test CLI TLS options.

* Maybe the right codeclimate option now...

* Improved TLS argument help, removed support for combining --cert/--key with --tls.

* Removed support for strict checking without any certs, black forced fscked up formatting.

* Update CLI tests for stricter TLS options.

Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>
This commit is contained in:
L. Kärkkäinen 2021-10-28 14:50:05 +01:00 committed by GitHub
parent 5b82884f8b
commit 6c7df68c7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 913 additions and 247 deletions

View File

@ -21,5 +21,7 @@ checks:
config: config:
threshold: 40 threshold: 40
complex-logic: complex-logic:
enabled: false
method-complexity:
config: config:
threshold: 10 threshold: 10

View File

@ -4,7 +4,7 @@ import sys
from argparse import ArgumentParser, RawTextHelpFormatter from argparse import ArgumentParser, RawTextHelpFormatter
from importlib import import_module from importlib import import_module
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional from typing import Union
from sanic_routing import __version__ as __routing_version__ # type: ignore from sanic_routing import __version__ as __routing_version__ # type: ignore
@ -79,10 +79,30 @@ def main():
help="location of unix socket\n ", help="location of unix socket\n ",
) )
parser.add_argument( parser.add_argument(
"--cert", dest="cert", type=str, help="Location of certificate for SSL" "--cert",
dest="cert",
type=str,
help="Location of fullchain.pem, bundle.crt or equivalent",
) )
parser.add_argument( parser.add_argument(
"--key", dest="key", type=str, help="location of keyfile for SSL\n " "--key",
dest="key",
type=str,
help="Location of privkey.pem or equivalent .key file",
)
parser.add_argument(
"--tls",
metavar="DIR",
type=str,
action="append",
help="TLS certificate folder with fullchain.pem and privkey.pem\n"
"May be specified multiple times to choose of multiple certificates",
)
parser.add_argument(
"--tls-strict-host",
dest="tlshost",
action="store_true",
help="Only allow clients that send an SNI matching server certs\n ",
) )
parser.add_bool_arguments( parser.add_bool_arguments(
"--access-logs", dest="access_log", help="display access logs" "--access-logs", dest="access_log", help="display access logs"
@ -126,6 +146,26 @@ def main():
) )
args = parser.parse_args() args = parser.parse_args()
# Custom TLS mismatch handling for better diagnostics
if (
# one of cert/key missing
bool(args.cert) != bool(args.key)
# new and old style args used together
or args.tls
and args.cert
# strict host checking without certs would always fail
or args.tlshost
and not args.tls
and not args.cert
):
parser.print_usage(sys.stderr)
error_logger.error(
"sanic: error: TLS certificates must be specified by either of:\n"
" --cert certdir/fullchain.pem --key certdir/privkey.pem\n"
" --tls certdir (equivalent to the above)"
)
sys.exit(1)
try: try:
module_path = os.path.abspath(os.getcwd()) module_path = os.path.abspath(os.getcwd())
if module_path not in sys.path: if module_path not in sys.path:
@ -155,14 +195,18 @@ def main():
f"Perhaps you meant {args.module}.app?" f"Perhaps you meant {args.module}.app?"
) )
ssl: Union[None, dict, str, list] = []
if args.tlshost:
ssl.append(None)
if args.cert is not None or args.key is not None: if args.cert is not None or args.key is not None:
ssl: Optional[Dict[str, Any]] = { ssl.append(dict(cert=args.cert, key=args.key))
"cert": args.cert, if args.tls:
"key": args.key, ssl += args.tls
} if not ssl:
else:
ssl = None ssl = None
elif len(ssl) == 1 and ssl[0] is not None:
# Use only one cert, no TLSSelector.
ssl = ssl[0]
kwargs = { kwargs = {
"host": args.host, "host": args.host,
"port": args.port, "port": args.port,

View File

@ -19,7 +19,7 @@ from functools import partial
from inspect import isawaitable from inspect import isawaitable
from pathlib import Path from pathlib import Path
from socket import socket from socket import socket
from ssl import Purpose, SSLContext, create_default_context from ssl import SSLContext
from traceback import format_exc from traceback import format_exc
from types import SimpleNamespace from types import SimpleNamespace
from typing import ( from typing import (
@ -78,6 +78,7 @@ from sanic.server import serve, serve_multiple, serve_single
from sanic.server.protocols.websocket_protocol import WebSocketProtocol from sanic.server.protocols.websocket_protocol import WebSocketProtocol
from sanic.server.websockets.impl import ConnectionClosed from sanic.server.websockets.impl import ConnectionClosed
from sanic.signals import Signal, SignalRouter from sanic.signals import Signal, SignalRouter
from sanic.tls import process_to_context
from sanic.touchup import TouchUp, TouchUpMeta from sanic.touchup import TouchUp, TouchUpMeta
@ -952,7 +953,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
*, *,
debug: bool = False, debug: bool = False,
auto_reload: Optional[bool] = None, auto_reload: Optional[bool] = None,
ssl: Union[Dict[str, str], SSLContext, None] = None, ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None, sock: Optional[socket] = None,
workers: int = 1, workers: int = 1,
protocol: Optional[Type[Protocol]] = None, protocol: Optional[Type[Protocol]] = None,
@ -979,7 +980,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
:type auto_relaod: bool :type auto_relaod: bool
:param ssl: SSLContext, or location of certificate and key :param ssl: SSLContext, or location of certificate and key
for SSL encryption of worker(s) for SSL encryption of worker(s)
:type ssl: SSLContext or dict :type ssl: str, dict, SSLContext or list
:param sock: Socket for the server to accept connections from :param sock: Socket for the server to accept connections from
:type sock: socket :type sock: socket
:param workers: Number of processes received before it is respected :param workers: Number of processes received before it is respected
@ -1089,7 +1090,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
port: Optional[int] = None, port: Optional[int] = None,
*, *,
debug: bool = False, debug: bool = False,
ssl: Union[Dict[str, str], SSLContext, None] = None, ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None, sock: Optional[socket] = None,
protocol: Type[Protocol] = None, protocol: Type[Protocol] = None,
backlog: int = 100, backlog: int = 100,
@ -1281,16 +1282,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
auto_reload=False, auto_reload=False,
): ):
"""Helper function used by `run` and `create_server`.""" """Helper function used by `run` and `create_server`."""
if isinstance(ssl, dict):
# try common aliaseses
cert = ssl.get("cert") or ssl.get("certificate")
key = ssl.get("key") or ssl.get("keyfile")
if cert is None or key is None:
raise ValueError("SSLContext or certificate and key required.")
context = create_default_context(purpose=Purpose.CLIENT_AUTH)
context.load_cert_chain(cert, keyfile=key)
ssl = context
if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0: if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0:
raise ValueError( raise ValueError(
"PROXIES_COUNT cannot be negative. " "PROXIES_COUNT cannot be negative. "
@ -1300,6 +1291,35 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
self.error_handler.debug = debug self.error_handler.debug = debug
self.debug = debug self.debug = debug
if self.configure_logging and debug:
logger.setLevel(logging.DEBUG)
if (
self.config.LOGO
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
):
logger.debug(
self.config.LOGO
if isinstance(self.config.LOGO, str)
else BASE_LOGO
)
# Serve
if host and port:
proto = "http"
if ssl is not None:
proto = "https"
if unix:
logger.info(f"Goin' Fast @ {unix} {proto}://...")
else:
# colon(:) is legal for a host only in an ipv6 address
display_host = f"[{host}]" if ":" in host else host
logger.info(f"Goin' Fast @ {proto}://{display_host}:{port}")
debug_mode = "enabled" if self.debug else "disabled"
reload_mode = "enabled" if auto_reload else "disabled"
logger.debug(f"Sanic auto-reload: {reload_mode}")
logger.debug(f"Sanic debug mode: {debug_mode}")
ssl = process_to_context(ssl)
server_settings = { server_settings = {
"protocol": protocol, "protocol": protocol,
@ -1328,39 +1348,9 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
listeners = [partial(listener, self) for listener in listeners] listeners = [partial(listener, self) for listener in listeners]
server_settings[settings_name] = listeners server_settings[settings_name] = listeners
if self.configure_logging and debug:
logger.setLevel(logging.DEBUG)
if (
self.config.LOGO
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
):
logger.debug(
self.config.LOGO
if isinstance(self.config.LOGO, str)
else BASE_LOGO
)
if run_async: if run_async:
server_settings["run_async"] = True server_settings["run_async"] = True
# Serve
if host and port:
proto = "http"
if ssl is not None:
proto = "https"
if unix:
logger.info(f"Goin' Fast @ {unix} {proto}://...")
else:
# colon(:) is legal for a host only in an ipv6 address
display_host = f"[{host}]" if ":" in host else host
logger.info(f"Goin' Fast @ {proto}://{display_host}:{port}")
debug_mode = "enabled" if self.debug else "disabled"
reload_mode = "enabled" if auto_reload else "disabled"
logger.debug(f"Sanic auto-reload: {reload_mode}")
logger.debug(f"Sanic debug mode: {debug_mode}")
return server_settings return server_settings
def _build_endpoint_name(self, *parts): def _build_endpoint_name(self, *parts):

View File

@ -1,4 +1,6 @@
from ssl import SSLObject
from types import SimpleNamespace from types import SimpleNamespace
from typing import Optional
from sanic.models.protocol_types import TransportProtocol from sanic.models.protocol_types import TransportProtocol
@ -20,8 +22,10 @@ class ConnInfo:
"peername", "peername",
"server_port", "server_port",
"server", "server",
"server_name",
"sockname", "sockname",
"ssl", "ssl",
"cert",
) )
def __init__(self, transport: TransportProtocol, unix=None): def __init__(self, transport: TransportProtocol, unix=None):
@ -31,8 +35,16 @@ class ConnInfo:
self.server_port = self.client_port = 0 self.server_port = self.client_port = 0
self.client_ip = "" self.client_ip = ""
self.sockname = addr = transport.get_extra_info("sockname") self.sockname = addr = transport.get_extra_info("sockname")
self.ssl: bool = bool(transport.get_extra_info("sslcontext")) self.ssl = False
self.server_name = ""
self.cert = {}
sslobj: Optional[SSLObject] = transport.get_extra_info(
"ssl_object"
) # type: ignore
if sslobj:
self.ssl = True
self.server_name = getattr(sslobj, "sanic_server_name", None) or ""
self.cert = getattr(sslobj.context, "sanic", {})
if isinstance(addr, str): # UNIX socket if isinstance(addr, str): # UNIX socket
self.server = unix or addr self.server = unix or addr
return return

196
sanic/tls.py Normal file
View File

@ -0,0 +1,196 @@
import os
import ssl
from typing import Iterable, Optional, Union
from sanic.log import logger
# Only allow secure ciphers, notably leaving out AES-CBC mode
# OpenSSL chooses ECDSA or RSA depending on the cert in use
CIPHERS_TLS12 = [
"ECDHE-ECDSA-CHACHA20-POLY1305",
"ECDHE-ECDSA-AES256-GCM-SHA384",
"ECDHE-ECDSA-AES128-GCM-SHA256",
"ECDHE-RSA-CHACHA20-POLY1305",
"ECDHE-RSA-AES256-GCM-SHA384",
"ECDHE-RSA-AES128-GCM-SHA256",
]
def create_context(
certfile: Optional[str] = None,
keyfile: Optional[str] = None,
password: Optional[str] = None,
) -> ssl.SSLContext:
"""Create a context with secure crypto and HTTP/1.1 in protocols."""
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.set_ciphers(":".join(CIPHERS_TLS12))
context.set_alpn_protocols(["http/1.1"])
context.sni_callback = server_name_callback
if certfile and keyfile:
context.load_cert_chain(certfile, keyfile, password)
return context
def shorthand_to_ctx(
ctxdef: Union[None, ssl.SSLContext, dict, str]
) -> Optional[ssl.SSLContext]:
"""Convert an ssl argument shorthand to an SSLContext object."""
if ctxdef is None or isinstance(ctxdef, ssl.SSLContext):
return ctxdef
if isinstance(ctxdef, str):
return load_cert_dir(ctxdef)
if isinstance(ctxdef, dict):
return CertSimple(**ctxdef)
raise ValueError(
f"Invalid ssl argument {type(ctxdef)}."
" Expecting a list of certdirs, a dict or an SSLContext."
)
def process_to_context(
ssldef: Union[None, ssl.SSLContext, dict, str, list, tuple]
) -> Optional[ssl.SSLContext]:
"""Process app.run ssl argument from easy formats to full SSLContext."""
return (
CertSelector(map(shorthand_to_ctx, ssldef))
if isinstance(ssldef, (list, tuple))
else shorthand_to_ctx(ssldef)
)
def load_cert_dir(p: str) -> ssl.SSLContext:
if os.path.isfile(p):
raise ValueError(f"Certificate folder expected but {p} is a file.")
keyfile = os.path.join(p, "privkey.pem")
certfile = os.path.join(p, "fullchain.pem")
if not os.access(keyfile, os.R_OK):
raise ValueError(
f"Certificate not found or permission denied {keyfile}"
)
if not os.access(certfile, os.R_OK):
raise ValueError(
f"Certificate not found or permission denied {certfile}"
)
return CertSimple(certfile, keyfile)
class CertSimple(ssl.SSLContext):
"""A wrapper for creating SSLContext with a sanic attribute."""
def __new__(cls, cert, key, **kw):
# try common aliases, rename to cert/key
certfile = kw["cert"] = kw.pop("certificate", None) or cert
keyfile = kw["key"] = kw.pop("keyfile", None) or key
password = kw.pop("password", None)
if not certfile or not keyfile:
raise ValueError("SSL dict needs filenames for cert and key.")
subject = {}
if "names" not in kw:
cert = ssl._ssl._test_decode_cert(certfile) # type: ignore
kw["names"] = [
name
for t, name in cert["subjectAltName"]
if t in ["DNS", "IP Address"]
]
subject = {k: v for item in cert["subject"] for k, v in item}
self = create_context(certfile, keyfile, password)
self.__class__ = cls
self.sanic = {**subject, **kw}
return self
def __init__(self, cert, key, **kw):
pass # Do not call super().__init__ because it is already initialized
class CertSelector(ssl.SSLContext):
"""Automatically select SSL certificate based on the hostname that the
client is trying to access, via SSL SNI. Paths to certificate folders
with privkey.pem and fullchain.pem in them should be provided, and
will be matched in the order given whenever there is a new connection.
"""
def __new__(cls, ctxs):
return super().__new__(cls)
def __init__(self, ctxs: Iterable[Optional[ssl.SSLContext]]):
super().__init__()
self.sni_callback = selector_sni_callback # type: ignore
self.sanic_select = []
self.sanic_fallback = None
all_names = []
for i, ctx in enumerate(ctxs):
if not ctx:
continue
names = getattr(ctx, "sanic", {}).get("names", [])
all_names += names
self.sanic_select.append(ctx)
if i == 0:
self.sanic_fallback = ctx
if not all_names:
raise ValueError(
"No certificates with SubjectAlternativeNames found."
)
logger.info(f"Certificate vhosts: {', '.join(all_names)}")
def find_cert(self: CertSelector, server_name: str):
"""Find the first certificate that matches the given SNI.
:raises ssl.CertificateError: No matching certificate found.
:return: A matching ssl.SSLContext object if found."""
if not server_name:
if self.sanic_fallback:
return self.sanic_fallback
raise ValueError(
"The client provided no SNI to match for certificate."
)
for ctx in self.sanic_select:
if match_hostname(ctx, server_name):
return ctx
if self.sanic_fallback:
return self.sanic_fallback
raise ValueError(f"No certificate found matching hostname {server_name!r}")
def match_hostname(
ctx: Union[ssl.SSLContext, CertSelector], hostname: str
) -> bool:
"""Match names from CertSelector against a received hostname."""
# Local certs are considered trusted, so this can be less pedantic
# and thus faster than the deprecated ssl.match_hostname function is.
names = getattr(ctx, "sanic", {}).get("names", [])
hostname = hostname.lower()
for name in names:
if name.startswith("*."):
if hostname.split(".", 1)[-1] == name[2:]:
return True
elif name == hostname:
return True
return False
def selector_sni_callback(
sslobj: ssl.SSLObject, server_name: str, ctx: CertSelector
) -> Optional[int]:
"""Select a certificate mathing the SNI."""
# Call server_name_callback to store the SNI on sslobj
server_name_callback(sslobj, server_name, ctx)
# Find a new context matching the hostname
try:
sslobj.context = find_cert(ctx, server_name)
except ValueError as e:
logger.warning(f"Rejecting TLS connection: {e}")
# This would show ERR_SSL_UNRECOGNIZED_NAME_ALERT on client side if
# asyncio/uvloop did proper SSL shutdown. They don't.
return ssl.ALERT_DESCRIPTION_UNRECOGNIZED_NAME
return None # mypy complains without explicit return
def server_name_callback(
sslobj: ssl.SSLObject, server_name: str, ctx: ssl.SSLContext
) -> None:
"""Store the received SNI as sslobj.sanic_server_name."""
sslobj.sanic_server_name = server_name # type: ignore

View File

@ -123,6 +123,7 @@ docs_require = [
] ]
dev_require = tests_require + [ dev_require = tests_require + [
"cryptography",
"tox", "tox",
"towncrier", "towncrier",
] ]

113
tests/certs/createcerts.py Normal file
View File

@ -0,0 +1,113 @@
from datetime import datetime, timedelta
from ipaddress import ip_address
from os import path
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.x509 import (
BasicConstraints,
CertificateBuilder,
DNSName,
ExtendedKeyUsage,
IPAddress,
KeyUsage,
Name,
NameAttribute,
SubjectAlternativeName,
random_serial_number,
)
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
def writefiles(key, cert):
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
folder = path.join(path.dirname(__file__), cn)
with open(path.join(folder, "fullchain.pem"), "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(path.join(folder, "privkey.pem"), "wb") as f:
f.write(
key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
)
def selfsigned(key, common_name, san):
subject = issuer = Name(
[
NameAttribute(NameOID.COMMON_NAME, common_name),
NameAttribute(NameOID.ORGANIZATION_NAME, "Sanic Org"),
]
)
cert = (
CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=365.25 * 8))
.add_extension(
KeyUsage(
True, False, False, False, False, False, False, False, False
),
critical=True,
)
.add_extension(
ExtendedKeyUsage(
[
ExtendedKeyUsageOID.SERVER_AUTH,
ExtendedKeyUsageOID.CLIENT_AUTH,
]
),
critical=False,
)
.add_extension(
BasicConstraints(ca=True, path_length=None),
critical=True,
)
.add_extension(
SubjectAlternativeName(
[
IPAddress(ip_address(n))
if n[0].isdigit() or ":" in n
else DNSName(n)
for n in san
]
),
critical=False,
)
.sign(key, hashes.SHA256())
)
return cert
# Sanic example/test self-signed cert RSA
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
cert = selfsigned(
key,
"sanic.example",
[
"sanic.example",
"www.sanic.example",
"*.sanic.test",
"2001:db8::541c",
],
)
writefiles(key, cert)
# Sanic localhost self-signed cert ECDSA
key = ec.generate_private_key(ec.SECP256R1)
cert = selfsigned(
key,
"localhost",
[
"localhost",
"127.0.0.1",
"::1",
],
)
writefiles(key, cert)

View File

@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIFP3fCUob41U1wvVOvei4dGsXrZeSiBUCX/xVu9215bvoAoGCCqGSM49
AwEHoUQDQgAEvBHo/RatEnPRBeiLURXX2sQDBbr9XRb73Fvm8jIOrPyJg8PcvNXH
D1jQah5K60THdjmdkLsY/hamZfqLb24EFQ==
-----END EC PRIVATE KEY-----

View File

@ -0,0 +1,12 @@
-----BEGIN CERTIFICATE-----
MIIBwjCCAWigAwIBAgIUQOCJIPRMiZsOMmvH0uiofxEDFn8wCgYIKoZIzj0EAwIw
KDESMBAGA1UEAwwJbG9jYWxob3N0MRIwEAYDVQQKDAlTYW5pYyBPcmcwHhcNMjEx
MDE5MTcwMTE3WhcNMjkxMDE5MTcwMTE3WjAoMRIwEAYDVQQDDAlsb2NhbGhvc3Qx
EjAQBgNVBAoMCVNhbmljIE9yZzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHf0
SrvRtGF9KIXEtk4+6vsqleNaleuYVvf4d6TD3pX1CbOV/NsZdW6+EhkA1U2pEBnJ
txXqAGVJT4ans8ud3K6jcDBuMA4GA1UdDwEB/wQEAwIHgDAdBgNVHSUEFjAUBggr
BgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB/zAsBgNVHREEJTAjggls
b2NhbGhvc3SHBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwCgYIKoZIzj0EAwIDSAAw
RQIhAJhwopVuiW0S4MKEDCl+Vxwyei5AYobrALcP0pwGpFzIAiAWkxMPeAOMWIjq
LD4t2UZ9h6ma2fS2Jf9pzTon6438Ng==
-----END CERTIFICATE-----

View File

@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDKTs1c2Qo7KMQ8DJrmIuNb29z2fNi4O+TNkJWjvclvsoAoGCCqGSM49
AwEHoUQDQgAEd/RKu9G0YX0ohcS2Tj7q+yqV41qV65hW9/h3pMPelfUJs5X82xl1
br4SGQDVTakQGcm3FeoAZUlPhqezy53crg==
-----END EC PRIVATE KEY-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIUF1H0To9k3mUiMT8mjF6g45A9KgcwDQYJKoZIhvcNAQEL
BQAwLDEWMBQGA1UEAwwNc2FuaWMuZXhhbXBsZTESMBAGA1UECgwJU2FuaWMgT3Jn
MB4XDTIxMTAxOTE3MDExN1oXDTI5MTAxOTE3MDExN1owLDEWMBQGA1UEAwwNc2Fu
aWMuZXhhbXBsZTESMBAGA1UECgwJU2FuaWMgT3JnMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAzNeC95zB5LRybz9Wl16+Q4kbOLgXlyUQVKhg9OZD1ChN
3T4Ya/KvChQmPWOWdF814NgkkNS1yHKXlORU2Ljbpqzr+WoOAwGVixbRTknjmI46
glUhCOJlGqxl16RfuYA2BWv0+At9jKBhT1tnrGVhfqldnxsb4FDh0JsFnrZN4/DB
z6x8PY1z0eQMgsyeKAfSTTnGXhkZzAQz6afuQbGZhe8vQIUTvwmnZiU9OdUZ6nLc
b7lSbIQ1edT6/xXUkbn5ixGsEQTf6JWLqEDLkqpo9sbkYmvMMQpj2pCgtaEjx7An
+hQe8Itv+i6h0KD3ARVeCBgdWEgXZTs7zKrmU77xfwIDAQABo4GQMIGNMA4GA1Ud
DwEB/wQEAwIHgDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0T
AQH/BAUwAwEB/zBLBgNVHREERDBCgg1zYW5pYy5leGFtcGxlghF3d3cuc2FuaWMu
ZXhhbXBsZYIMKi5zYW5pYy50ZXN0hxAgAQ24AAAAAAAAAAAAAFQcMA0GCSqGSIb3
DQEBCwUAA4IBAQBLV7xSEI7308Qmm3SyV+ro9jQ/i2ydwUIUyRMtf04EFRS8fHK/
Lln5Yweaba9XP5k3DLSC63Qg1tE50fVqQypbWVA4SMkMW21cK8vEhHEYeGYkHsuC
xCFdwJYhmofqWaQ/j/ErLBrQbaHBdSJ/Nou5RPRtM4HrSU7F2azLGmLczYk6PcZa
wSBvoXdjiEUrRl7XB0iB2ktTga6amuYz4bSJzUvaA8SodJzC4OKhRsduUD83LdDi
2As4KiTcSO/SOCaK2KmbPNBlTKMF4cpqysGMvmnGVWhECOG1PZItJkWNbbBV4XRR
qGmrey2JwDDeTYHFDHaND385/PSJKfSSGLNk
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAzNeC95zB5LRybz9Wl16+Q4kbOLgXlyUQVKhg9OZD1ChN3T4Y
a/KvChQmPWOWdF814NgkkNS1yHKXlORU2Ljbpqzr+WoOAwGVixbRTknjmI46glUh
COJlGqxl16RfuYA2BWv0+At9jKBhT1tnrGVhfqldnxsb4FDh0JsFnrZN4/DBz6x8
PY1z0eQMgsyeKAfSTTnGXhkZzAQz6afuQbGZhe8vQIUTvwmnZiU9OdUZ6nLcb7lS
bIQ1edT6/xXUkbn5ixGsEQTf6JWLqEDLkqpo9sbkYmvMMQpj2pCgtaEjx7An+hQe
8Itv+i6h0KD3ARVeCBgdWEgXZTs7zKrmU77xfwIDAQABAoIBABWKpG89wPY4M8CX
PJf2krOve3lfgruWXj1I58lZXdC13Fpj6VWQ0++PZuYVzwC18oiOsmm4tNU7l81E
pdeUuSSyEq7MBGU0iXFzGNfO1Wx5qJWENlEk3dUMRDmFQ7vSS9wOGljrfGyJgTJD
PofWsYYMcZgF1cylNNonM1QZf990hfd0JDfO6CHCloRe/pKIdVzIxQp+3Ju/3OPk
Gw5V+YnVrG4wdZbhOCW2hPp/TLdgFy/xHvrxkEkGx+2ZHGCw9uFj2LRZJwwuaO9p
LDzbyfbFlPWIHdPamdBvenZ6RNTf28+YsbiqwoOk5C286QYb/VDnT8UnG42hXS1I
p3m//qECgYEA7zXmMSBy1tkMQsuaAakOFfl2HfVL2rrW6/CH6BwcCHUD6Wr8wv6a
kPNhI6pqqnP6Xg8XqJXfyIVZOJYPQMQr69zni2y7b3jPOemVGTBSqN7UE71NZkHF
+HZov55bPuX/KD6qc/WAXCyEcISy9TmcA7cEN7ivmyXmbuSXEoiAjlsCgYEA2zgU
mzL6ObJ2555UOqzGCMx6o2KQqOgA1SGmYLBRX77I3fuvGj+DLo6/iuM0FcVV7alG
U/U6qqrSymtdRgeZXHziSVhLZKY/qobgKG2iO1F3DzqyZ94EK/v0XRS4UyiJma3f
lwVG/BcVnv+FKCYUo2JKGln0R8Wcm6D9Nxp0mq0CgYEAn0Dj+oreyZiAqCuCYV6a
SRjmgTVghcNj+HoPEQE9zIeSziBzHKKCZsQRRLxc/RPveBVWK99zt7zHVHvatcSk
dQeBg3olIyZr1+NhZv6b2V9YE7gwwkZBtZOnUwLrPmnCwJlPw5mLFlJw7bP6rHXp
HzQF887Z4lGOIv++cBE+fQcCgYEArF26BhXdHcSvLYsWW1RCGeT9gL4dVFGnZe2h
bmD0er3+Hlyo35CUyuS+wqvG5l9VIxt4CsfFKzBJsZMdsdSDx28CVf0wuqDlamXG
lsMtTkrNvJHAeV7eFN900kNaczhqiQVnys0BdXGJNI1g26Klk5nS/klAg7ZjXxME
RnFswbkCgYBG5OToLXM8pg3yTM9MHMSXFhnnd2MbBK2AySFah2P1V4xv1rJdklU0
9QRTd/hQmYGHioPIF9deU8YSWlj+FBimyoNfJ51YzFyp2maOSJq4Wxe1nv2DflRK
gh5pkl8FizoDnu8BHu1AjOfRQJ3/tCIi2XZJgBuCxyTjd1b6hVUhyg==
-----END RSA PRIVATE KEY-----

View File

@ -1,22 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDtTCCAp2gAwIBAgIJAO6wb0FSc/rNMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAlVTMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTcwMzAzMTUyODAzWhcNMTkxMTI4MTUyODAzWjBF
MQswCQYDVQQGEwJVUzETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAsy7Zb3p4yCEnUtPLwqeJrwj9u/ZmcFCrMAktFBx9hG6rY2r7mdB6Bflh
V5cUJXxnsNiDpYcxGhA8kry7pEork1vZ05DyZC9ulVlvxBouVShBcLLwdpaoTGqE
vYtejv6x7ogwMXOjkWWb1WpOv4CVhpeXJ7O/d1uAiYgcUpTpPp4ONG49IAouBHq3
h+o4nVvNfB0J8gaCtTsTZqi1Wt8WYs3XjxGJaKh//ealfRe1kuv40CWQ8gjaC8/1
w9pHdom3Wi/RwfDM3+dVGV6M5lAbPXMB4RK17Hk9P3hlJxJOpKBdgcBJPXtNrTwf
qEWWxk2mB/YVyB84AxjkkNoYyi2ggQIDAQABo4GnMIGkMB0GA1UdDgQWBBRa46Ix
9s9tmMqu+Zz1mocHghm4NTB1BgNVHSMEbjBsgBRa46Ix9s9tmMqu+Zz1mocHghm4
NaFJpEcwRTELMAkGA1UEBhMCVVMxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNV
BAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAO6wb0FSc/rNMAwGA1UdEwQF
MAMBAf8wDQYJKoZIhvcNAQELBQADggEBACdrnM8zb7abxAJsU5WLn1IR0f2+EFA7
ezBEJBM4bn0IZrXuP5ThZ2wieJlshG0C16XN9+zifavHci+AtQwWsB0f/ppHdvWQ
7wt7JN88w+j0DNIYEadRCjWxR3gRAXPgKu3sdyScKFq8MvB49A2EdXRmQSTIM6Fj
teRbE+poxewFT0mhurf3xrtGiSALmv7uAzhRDqpYUzcUlbOGgkyFLYAOOdvZvei+
mfXDi4HKYxgyv53JxBARMdajnCHXM7zQ6Tjc8j1HRtmDQ3XapUB559KfxfODGQq5
zmeoZWU4duxcNXJM0Eiz1CJ39JoWwi8sqaGi/oskuyAh7YKyVTn8xa8=
-----END CERTIFICATE-----

View File

@ -1,27 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAsy7Zb3p4yCEnUtPLwqeJrwj9u/ZmcFCrMAktFBx9hG6rY2r7
mdB6BflhV5cUJXxnsNiDpYcxGhA8kry7pEork1vZ05DyZC9ulVlvxBouVShBcLLw
dpaoTGqEvYtejv6x7ogwMXOjkWWb1WpOv4CVhpeXJ7O/d1uAiYgcUpTpPp4ONG49
IAouBHq3h+o4nVvNfB0J8gaCtTsTZqi1Wt8WYs3XjxGJaKh//ealfRe1kuv40CWQ
8gjaC8/1w9pHdom3Wi/RwfDM3+dVGV6M5lAbPXMB4RK17Hk9P3hlJxJOpKBdgcBJ
PXtNrTwfqEWWxk2mB/YVyB84AxjkkNoYyi2ggQIDAQABAoIBAFgVasxTf3aaXbNo
7JzXMWb7W4iAG2GRNmZZzHA7hTSKFvS7jc3SX3n6WvDtEvlOi8ay2RyRNgEjBDP6
VZ/w2jUJjS5k7dN0Qb9nhPr5B9fS/0CAppcVfsx5/KEVFzniWOPyzQYyW7FJKu8h
4G5hrp/Ie4UH5tKtB6YUZB/wliyyQUkAZdBcoy1hfkOZLAXb1oofArKsiQUHIRA5
th1yyS4cZP8Upngd1EE+d95dFHM2F6iI2lj6DHuu+JxUZ+wKXoNimdG7JniRtIf4
56GoDov83Ey+XbIS6FSQc9nY0ijBDcubl/yP3roCQpE+MZ9BNEo5uj7YmCtAMYLW
TXTNBGUCgYEA4wdkH1NLdub2NcpqwmSA0AtbRvDkt0XTDWWwmuMr/+xPVa4sUKHs
80THQEX/WAZroP6IPbMP6BJhzb53vECukgC65qPxu6M9D1lBGtglxgen4AMu1bKK
gnM8onwARGIo/2ay6qRRZZCxg0TvBky3hbTcIM2zVrnKU6VVyGKHSV8CgYEAygxs
WQYrACv3XN6ZEzyxy08JgjbcnkPWK/m3VPcyHgdEkDu8+nDdUVdbF/js2JWMMx5g
vrPhZ7jVLOXGcLr5mVU4dG5tW5lU0bMy+YYxpEQDiBKlpXgfOsQnakHj7cCZ6bay
mKjJck2oEAQS9bqOJN/Ts5vhOmc8rmhkO7hnAh8CgYEArhVDy9Vl/1WYo6SD+m1w
bJbYtewPpQzwicxZAFuDqKk+KDf3GRkhBWTO2FUUOB4sN3YVaCI+5zf5MPeE/qAm
fCP9LM+3k6bXMkbBamEljdTfACHQruJJ3T+Z1gn5dnZCc5z/QncfRx8NTtfz5MO8
0dTeGnVAuBacs0kLHy2WCUcCgYALNBkl7pOf1NBIlAdE686oCV/rmoMtO3G6yoQB
8BsVUy3YGZfnAy8ifYeNkr3/XHuDsiGHMY5EJBmd/be9NID2oaUZv63MsHnljtw6
vdgu1Z6kgvQwcrK4nXvaBoFPA6kFLp5EnMde0TOKf89VVNzg6pBgmzon9OWGfj9g
mF8N3QKBgQCeoLwxUxpzEA0CPHm7DWF0LefVGllgZ23Eqncdy0QRku5zwwibszbL
sWaR3uDCc3oYcbSGCDVx3cSkvMAJNalc5ZHPfoV9W0+v392/rrExo5iwD8CSoCb2
gFWkeR7PBrD3NzFzFAWyiudzhBKHfRsB0MpCXbJV/WLqTlGIbEypjg==
-----END RSA PRIVATE KEY-----

View File

@ -45,6 +45,58 @@ def test_server_run(appname):
assert firstline == b"Goin' Fast @ http://127.0.0.1:8000" assert firstline == b"Goin' Fast @ http://127.0.0.1:8000"
@pytest.mark.parametrize(
"cmd",
(
(
"--cert=certs/sanic.example/fullchain.pem",
"--key=certs/sanic.example/privkey.pem",
),
(
"--tls=certs/sanic.example/",
"--tls=certs/localhost/",
),
(
"--tls=certs/sanic.example/",
"--tls=certs/localhost/",
"--tls-strict-host",
),
),
)
def test_tls_options(cmd):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode != 1
lines = out.split(b"\n")
firstline = lines[6]
assert firstline == b"Goin' Fast @ https://127.0.0.1:9999"
@pytest.mark.parametrize(
"cmd",
(
(
"--cert=certs/sanic.example/fullchain.pem",
),
(
"--cert=certs/sanic.example/fullchain.pem",
"--key=certs/sanic.example/privkey.pem",
"--tls=certs/localhost/",
),
(
"--tls-strict-host",
),
),
)
def test_tls_wrong_options(cmd):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode == 1
assert not out
errmsg = err.decode().split("sanic: error: ")[1].split("\n")[0]
assert errmsg == "TLS certificates must be specified by either of:"
@pytest.mark.parametrize( @pytest.mark.parametrize(
"cmd", "cmd",
( (

View File

@ -1,6 +1,4 @@
import logging import logging
import os
import sys
import uuid import uuid
from importlib import reload from importlib import reload
@ -9,12 +7,9 @@ from unittest.mock import Mock
import pytest import pytest
from sanic_testing.testing import SanicTestClient
import sanic import sanic
from sanic import Sanic from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.log import LOGGING_CONFIG_DEFAULTS, logger from sanic.log import LOGGING_CONFIG_DEFAULTS, logger
from sanic.response import text from sanic.response import text
@ -155,56 +150,6 @@ async def test_logger(caplog):
assert record in caplog.record_tuples assert record in caplog.record_tuples
@pytest.mark.skipif(
OS_IS_WINDOWS and sys.version_info >= (3, 8),
reason="Not testable with current client",
)
def test_logger_static_and_secure(caplog):
# Same as test_logger, except for more coverage:
# - test_client initialised separately for static port
# - using ssl
rand_string = str(uuid.uuid4())
app = Sanic(name=__name__)
@app.get("/")
def log_info(request):
logger.info(rand_string)
return text("hello")
current_dir = os.path.dirname(os.path.realpath(__file__))
ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
ssl_dict = {"cert": ssl_cert, "key": ssl_key}
test_client = SanicTestClient(app, port=42101)
with caplog.at_level(logging.INFO):
request, response = test_client.get(
f"https://127.0.0.1:{test_client.port}/",
server_kwargs=dict(ssl=ssl_dict),
)
port = test_client.port
assert caplog.record_tuples[0] == (
"sanic.root",
logging.INFO,
f"Goin' Fast @ https://127.0.0.1:{port}",
)
assert caplog.record_tuples[1] == (
"sanic.root",
logging.INFO,
f"https://127.0.0.1:{port}/",
)
assert caplog.record_tuples[2] == ("sanic.root", logging.INFO, rand_string)
assert caplog.record_tuples[-1] == (
"sanic.root",
logging.INFO,
"Server Stopped",
)
def test_logging_modified_root_logger_config(): def test_logging_modified_root_logger_config():
# reset_logging() # reset_logging()

View File

@ -1,6 +1,4 @@
import logging import logging
import os
import ssl
from json import dumps as json_dumps from json import dumps as json_dumps
from json import loads as json_loads from json import loads as json_loads
@ -1119,92 +1117,6 @@ async def test_url_attributes_no_ssl_asgi(app, path, query, expected_url):
assert parsed.netloc == request.host assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_context(app, path, query, expected_url):
current_dir = os.path.dirname(os.path.realpath(__file__))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
os.path.join(current_dir, "certs/selfsigned.cert"),
keyfile=os.path.join(current_dir, "certs/selfsigned.key"),
)
async def handler(request):
return text("OK")
app.add_route(handler, path)
port = app.test_client.port
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": context},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
current_dir = os.path.dirname(os.path.realpath(__file__))
ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
ssl_dict = {"cert": ssl_cert, "key": ssl_key}
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": ssl_dict},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_invalid_ssl_dict(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_dict = {"cert": None, "key": None}
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_dict}
)
assert str(excinfo.value) == "SSLContext or certificate and key required."
def test_form_with_multiple_values(app): def test_form_with_multiple_values(app):
@app.route("/", methods=["POST"]) @app.route("/", methods=["POST"])
async def handler(request): async def handler(request):

378
tests/test_tls.py Normal file
View File

@ -0,0 +1,378 @@
import logging
import os
import ssl
import uuid
from contextlib import contextmanager
from urllib.parse import urlparse
import pytest
from sanic_testing.testing import HOST, PORT, SanicTestClient
from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.log import logger
from sanic.response import text
current_dir = os.path.dirname(os.path.realpath(__file__))
localhost_dir = os.path.join(current_dir, "certs/localhost")
sanic_dir = os.path.join(current_dir, "certs/sanic.example")
invalid_dir = os.path.join(current_dir, "certs/invalid.nonexist")
localhost_cert = os.path.join(localhost_dir, "fullchain.pem")
localhost_key = os.path.join(localhost_dir, "privkey.pem")
sanic_cert = os.path.join(sanic_dir, "fullchain.pem")
sanic_key = os.path.join(sanic_dir, "privkey.pem")
@contextmanager
def replace_server_name(hostname):
"""Temporarily replace the server name sent with all TLS requests with a fake hostname."""
def hack_wrap_bio(
self,
incoming,
outgoing,
server_side=False,
server_hostname=None,
session=None,
):
return orig_wrap_bio(
self, incoming, outgoing, server_side, hostname, session
)
orig_wrap_bio, ssl.SSLContext.wrap_bio = (
ssl.SSLContext.wrap_bio,
hack_wrap_bio,
)
try:
yield
finally:
ssl.SSLContext.wrap_bio = orig_wrap_bio
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_context(app, path, query, expected_url):
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(localhost_cert, localhost_key)
async def handler(request):
return text("OK")
app.add_route(handler, path)
port = app.test_client.port
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": context},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": ssl_dict},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_cert_sni_single(app):
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
@app.get("/commonname")
async def handler(request):
return text(request.conn_info.cert.get("commonName"))
port = app.test_client.port
request, response = app.test_client.get(
f"https://localhost:{port}/sni",
server_kwargs={"ssl": localhost_dir},
)
assert response.status == 200
assert response.text == "localhost"
request, response = app.test_client.get(
f"https://localhost:{port}/commonname",
server_kwargs={"ssl": localhost_dir},
)
assert response.status == 200
assert response.text == "localhost"
def test_cert_sni_list(app):
ssl_list = [sanic_dir, localhost_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
@app.get("/commonname")
async def handler(request):
return text(request.conn_info.cert.get("commonName"))
# This test should match the localhost cert
port = app.test_client.port
request, response = app.test_client.get(
f"https://localhost:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "localhost"
request, response = app.test_client.get(
f"https://localhost:{port}/commonname",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "localhost"
# This part should use the sanic.example cert because it matches
with replace_server_name("www.sanic.example"):
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "www.sanic.example"
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/commonname",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "sanic.example"
# This part should use the sanic.example cert, that being the first listed
with replace_server_name("invalid.test"):
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "invalid.test"
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/commonname",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "sanic.example"
def test_missing_sni(app):
"""The sanic cert does not list 127.0.0.1 and httpx does not send IP as SNI anyway."""
ssl_list = [None, sanic_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
port = app.test_client.port
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
def test_no_matching_cert(app):
"""The sanic cert does not list 127.0.0.1 and httpx does not send IP as SNI anyway."""
ssl_list = [None, sanic_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
port = app.test_client.port
with replace_server_name("invalid.test"):
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
def test_wildcards(app):
ssl_list = [None, localhost_dir, sanic_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
port = app.test_client.port
with replace_server_name("foo.sanic.test"):
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "foo.sanic.test"
with replace_server_name("sanic.test"):
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
with replace_server_name("sub.foo.sanic.test"):
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
def test_invalid_ssl_dict(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_dict = {"cert": None, "key": None}
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_dict}
)
assert str(excinfo.value) == "SSL dict needs filenames for cert and key."
def test_invalid_ssl_type(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": False}
)
assert "Invalid ssl argument" in str(excinfo.value)
def test_cert_file_on_pathlist(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_list = [sanic_cert]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "folder expected" in str(excinfo.value)
assert sanic_cert in str(excinfo.value)
def test_missing_cert_path(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_list = [invalid_dir]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "not found" in str(excinfo.value)
assert invalid_dir + "/privkey.pem" in str(excinfo.value)
def test_missing_cert_file(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
invalid2 = invalid_dir.replace("nonexist", "certmissing")
ssl_list = [invalid2]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "not found" in str(excinfo.value)
assert invalid2 + "/fullchain.pem" in str(excinfo.value)
def test_no_certs_on_list(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_list = [None]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "No certificates" in str(excinfo.value)
def test_logger_vhosts(caplog):
app = Sanic(name=__name__)
@app.after_server_start
def stop(*args):
app.stop()
with caplog.at_level(logging.INFO):
app.run(host="127.0.0.1", port=42102, ssl=[localhost_dir, sanic_dir])
logmsg = [
m for s, l, m in caplog.record_tuples if m.startswith("Certificate")
][0]
assert logmsg == (
"Certificate vhosts: localhost, 127.0.0.1, 0:0:0:0:0:0:0:1, sanic.example, www.sanic.example, *.sanic.test, 2001:DB8:0:0:0:0:0:541C"
)