Merge branch 'main' into feat/optional-uvloop-use

This commit is contained in:
Néstor Pérez
2021-10-31 12:47:22 +01:00
committed by GitHub
33 changed files with 1077 additions and 286 deletions

113
tests/certs/createcerts.py Normal file
View File

@@ -0,0 +1,113 @@
from datetime import datetime, timedelta
from ipaddress import ip_address
from os import path
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.x509 import (
BasicConstraints,
CertificateBuilder,
DNSName,
ExtendedKeyUsage,
IPAddress,
KeyUsage,
Name,
NameAttribute,
SubjectAlternativeName,
random_serial_number,
)
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
def writefiles(key, cert):
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
folder = path.join(path.dirname(__file__), cn)
with open(path.join(folder, "fullchain.pem"), "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(path.join(folder, "privkey.pem"), "wb") as f:
f.write(
key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
)
def selfsigned(key, common_name, san):
subject = issuer = Name(
[
NameAttribute(NameOID.COMMON_NAME, common_name),
NameAttribute(NameOID.ORGANIZATION_NAME, "Sanic Org"),
]
)
cert = (
CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=365.25 * 8))
.add_extension(
KeyUsage(
True, False, False, False, False, False, False, False, False
),
critical=True,
)
.add_extension(
ExtendedKeyUsage(
[
ExtendedKeyUsageOID.SERVER_AUTH,
ExtendedKeyUsageOID.CLIENT_AUTH,
]
),
critical=False,
)
.add_extension(
BasicConstraints(ca=True, path_length=None),
critical=True,
)
.add_extension(
SubjectAlternativeName(
[
IPAddress(ip_address(n))
if n[0].isdigit() or ":" in n
else DNSName(n)
for n in san
]
),
critical=False,
)
.sign(key, hashes.SHA256())
)
return cert
# Sanic example/test self-signed cert RSA
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
cert = selfsigned(
key,
"sanic.example",
[
"sanic.example",
"www.sanic.example",
"*.sanic.test",
"2001:db8::541c",
],
)
writefiles(key, cert)
# Sanic localhost self-signed cert ECDSA
key = ec.generate_private_key(ec.SECP256R1)
cert = selfsigned(
key,
"localhost",
[
"localhost",
"127.0.0.1",
"::1",
],
)
writefiles(key, cert)

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIFP3fCUob41U1wvVOvei4dGsXrZeSiBUCX/xVu9215bvoAoGCCqGSM49
AwEHoUQDQgAEvBHo/RatEnPRBeiLURXX2sQDBbr9XRb73Fvm8jIOrPyJg8PcvNXH
D1jQah5K60THdjmdkLsY/hamZfqLb24EFQ==
-----END EC PRIVATE KEY-----

View File

@@ -0,0 +1,12 @@
-----BEGIN CERTIFICATE-----
MIIBwjCCAWigAwIBAgIUQOCJIPRMiZsOMmvH0uiofxEDFn8wCgYIKoZIzj0EAwIw
KDESMBAGA1UEAwwJbG9jYWxob3N0MRIwEAYDVQQKDAlTYW5pYyBPcmcwHhcNMjEx
MDE5MTcwMTE3WhcNMjkxMDE5MTcwMTE3WjAoMRIwEAYDVQQDDAlsb2NhbGhvc3Qx
EjAQBgNVBAoMCVNhbmljIE9yZzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHf0
SrvRtGF9KIXEtk4+6vsqleNaleuYVvf4d6TD3pX1CbOV/NsZdW6+EhkA1U2pEBnJ
txXqAGVJT4ans8ud3K6jcDBuMA4GA1UdDwEB/wQEAwIHgDAdBgNVHSUEFjAUBggr
BgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB/zAsBgNVHREEJTAjggls
b2NhbGhvc3SHBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwCgYIKoZIzj0EAwIDSAAw
RQIhAJhwopVuiW0S4MKEDCl+Vxwyei5AYobrALcP0pwGpFzIAiAWkxMPeAOMWIjq
LD4t2UZ9h6ma2fS2Jf9pzTon6438Ng==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDKTs1c2Qo7KMQ8DJrmIuNb29z2fNi4O+TNkJWjvclvsoAoGCCqGSM49
AwEHoUQDQgAEd/RKu9G0YX0ohcS2Tj7q+yqV41qV65hW9/h3pMPelfUJs5X82xl1
br4SGQDVTakQGcm3FeoAZUlPhqezy53crg==
-----END EC PRIVATE KEY-----

View File

@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIUF1H0To9k3mUiMT8mjF6g45A9KgcwDQYJKoZIhvcNAQEL
BQAwLDEWMBQGA1UEAwwNc2FuaWMuZXhhbXBsZTESMBAGA1UECgwJU2FuaWMgT3Jn
MB4XDTIxMTAxOTE3MDExN1oXDTI5MTAxOTE3MDExN1owLDEWMBQGA1UEAwwNc2Fu
aWMuZXhhbXBsZTESMBAGA1UECgwJU2FuaWMgT3JnMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAzNeC95zB5LRybz9Wl16+Q4kbOLgXlyUQVKhg9OZD1ChN
3T4Ya/KvChQmPWOWdF814NgkkNS1yHKXlORU2Ljbpqzr+WoOAwGVixbRTknjmI46
glUhCOJlGqxl16RfuYA2BWv0+At9jKBhT1tnrGVhfqldnxsb4FDh0JsFnrZN4/DB
z6x8PY1z0eQMgsyeKAfSTTnGXhkZzAQz6afuQbGZhe8vQIUTvwmnZiU9OdUZ6nLc
b7lSbIQ1edT6/xXUkbn5ixGsEQTf6JWLqEDLkqpo9sbkYmvMMQpj2pCgtaEjx7An
+hQe8Itv+i6h0KD3ARVeCBgdWEgXZTs7zKrmU77xfwIDAQABo4GQMIGNMA4GA1Ud
DwEB/wQEAwIHgDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0T
AQH/BAUwAwEB/zBLBgNVHREERDBCgg1zYW5pYy5leGFtcGxlghF3d3cuc2FuaWMu
ZXhhbXBsZYIMKi5zYW5pYy50ZXN0hxAgAQ24AAAAAAAAAAAAAFQcMA0GCSqGSIb3
DQEBCwUAA4IBAQBLV7xSEI7308Qmm3SyV+ro9jQ/i2ydwUIUyRMtf04EFRS8fHK/
Lln5Yweaba9XP5k3DLSC63Qg1tE50fVqQypbWVA4SMkMW21cK8vEhHEYeGYkHsuC
xCFdwJYhmofqWaQ/j/ErLBrQbaHBdSJ/Nou5RPRtM4HrSU7F2azLGmLczYk6PcZa
wSBvoXdjiEUrRl7XB0iB2ktTga6amuYz4bSJzUvaA8SodJzC4OKhRsduUD83LdDi
2As4KiTcSO/SOCaK2KmbPNBlTKMF4cpqysGMvmnGVWhECOG1PZItJkWNbbBV4XRR
qGmrey2JwDDeTYHFDHaND385/PSJKfSSGLNk
-----END CERTIFICATE-----

View File

@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAzNeC95zB5LRybz9Wl16+Q4kbOLgXlyUQVKhg9OZD1ChN3T4Y
a/KvChQmPWOWdF814NgkkNS1yHKXlORU2Ljbpqzr+WoOAwGVixbRTknjmI46glUh
COJlGqxl16RfuYA2BWv0+At9jKBhT1tnrGVhfqldnxsb4FDh0JsFnrZN4/DBz6x8
PY1z0eQMgsyeKAfSTTnGXhkZzAQz6afuQbGZhe8vQIUTvwmnZiU9OdUZ6nLcb7lS
bIQ1edT6/xXUkbn5ixGsEQTf6JWLqEDLkqpo9sbkYmvMMQpj2pCgtaEjx7An+hQe
8Itv+i6h0KD3ARVeCBgdWEgXZTs7zKrmU77xfwIDAQABAoIBABWKpG89wPY4M8CX
PJf2krOve3lfgruWXj1I58lZXdC13Fpj6VWQ0++PZuYVzwC18oiOsmm4tNU7l81E
pdeUuSSyEq7MBGU0iXFzGNfO1Wx5qJWENlEk3dUMRDmFQ7vSS9wOGljrfGyJgTJD
PofWsYYMcZgF1cylNNonM1QZf990hfd0JDfO6CHCloRe/pKIdVzIxQp+3Ju/3OPk
Gw5V+YnVrG4wdZbhOCW2hPp/TLdgFy/xHvrxkEkGx+2ZHGCw9uFj2LRZJwwuaO9p
LDzbyfbFlPWIHdPamdBvenZ6RNTf28+YsbiqwoOk5C286QYb/VDnT8UnG42hXS1I
p3m//qECgYEA7zXmMSBy1tkMQsuaAakOFfl2HfVL2rrW6/CH6BwcCHUD6Wr8wv6a
kPNhI6pqqnP6Xg8XqJXfyIVZOJYPQMQr69zni2y7b3jPOemVGTBSqN7UE71NZkHF
+HZov55bPuX/KD6qc/WAXCyEcISy9TmcA7cEN7ivmyXmbuSXEoiAjlsCgYEA2zgU
mzL6ObJ2555UOqzGCMx6o2KQqOgA1SGmYLBRX77I3fuvGj+DLo6/iuM0FcVV7alG
U/U6qqrSymtdRgeZXHziSVhLZKY/qobgKG2iO1F3DzqyZ94EK/v0XRS4UyiJma3f
lwVG/BcVnv+FKCYUo2JKGln0R8Wcm6D9Nxp0mq0CgYEAn0Dj+oreyZiAqCuCYV6a
SRjmgTVghcNj+HoPEQE9zIeSziBzHKKCZsQRRLxc/RPveBVWK99zt7zHVHvatcSk
dQeBg3olIyZr1+NhZv6b2V9YE7gwwkZBtZOnUwLrPmnCwJlPw5mLFlJw7bP6rHXp
HzQF887Z4lGOIv++cBE+fQcCgYEArF26BhXdHcSvLYsWW1RCGeT9gL4dVFGnZe2h
bmD0er3+Hlyo35CUyuS+wqvG5l9VIxt4CsfFKzBJsZMdsdSDx28CVf0wuqDlamXG
lsMtTkrNvJHAeV7eFN900kNaczhqiQVnys0BdXGJNI1g26Klk5nS/klAg7ZjXxME
RnFswbkCgYBG5OToLXM8pg3yTM9MHMSXFhnnd2MbBK2AySFah2P1V4xv1rJdklU0
9QRTd/hQmYGHioPIF9deU8YSWlj+FBimyoNfJ51YzFyp2maOSJq4Wxe1nv2DflRK
gh5pkl8FizoDnu8BHu1AjOfRQJ3/tCIi2XZJgBuCxyTjd1b6hVUhyg==
-----END RSA PRIVATE KEY-----

View File

@@ -1,22 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDtTCCAp2gAwIBAgIJAO6wb0FSc/rNMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAlVTMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTcwMzAzMTUyODAzWhcNMTkxMTI4MTUyODAzWjBF
MQswCQYDVQQGEwJVUzETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAsy7Zb3p4yCEnUtPLwqeJrwj9u/ZmcFCrMAktFBx9hG6rY2r7mdB6Bflh
V5cUJXxnsNiDpYcxGhA8kry7pEork1vZ05DyZC9ulVlvxBouVShBcLLwdpaoTGqE
vYtejv6x7ogwMXOjkWWb1WpOv4CVhpeXJ7O/d1uAiYgcUpTpPp4ONG49IAouBHq3
h+o4nVvNfB0J8gaCtTsTZqi1Wt8WYs3XjxGJaKh//ealfRe1kuv40CWQ8gjaC8/1
w9pHdom3Wi/RwfDM3+dVGV6M5lAbPXMB4RK17Hk9P3hlJxJOpKBdgcBJPXtNrTwf
qEWWxk2mB/YVyB84AxjkkNoYyi2ggQIDAQABo4GnMIGkMB0GA1UdDgQWBBRa46Ix
9s9tmMqu+Zz1mocHghm4NTB1BgNVHSMEbjBsgBRa46Ix9s9tmMqu+Zz1mocHghm4
NaFJpEcwRTELMAkGA1UEBhMCVVMxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNV
BAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAO6wb0FSc/rNMAwGA1UdEwQF
MAMBAf8wDQYJKoZIhvcNAQELBQADggEBACdrnM8zb7abxAJsU5WLn1IR0f2+EFA7
ezBEJBM4bn0IZrXuP5ThZ2wieJlshG0C16XN9+zifavHci+AtQwWsB0f/ppHdvWQ
7wt7JN88w+j0DNIYEadRCjWxR3gRAXPgKu3sdyScKFq8MvB49A2EdXRmQSTIM6Fj
teRbE+poxewFT0mhurf3xrtGiSALmv7uAzhRDqpYUzcUlbOGgkyFLYAOOdvZvei+
mfXDi4HKYxgyv53JxBARMdajnCHXM7zQ6Tjc8j1HRtmDQ3XapUB559KfxfODGQq5
zmeoZWU4duxcNXJM0Eiz1CJ39JoWwi8sqaGi/oskuyAh7YKyVTn8xa8=
-----END CERTIFICATE-----

View File

@@ -1,27 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAsy7Zb3p4yCEnUtPLwqeJrwj9u/ZmcFCrMAktFBx9hG6rY2r7
mdB6BflhV5cUJXxnsNiDpYcxGhA8kry7pEork1vZ05DyZC9ulVlvxBouVShBcLLw
dpaoTGqEvYtejv6x7ogwMXOjkWWb1WpOv4CVhpeXJ7O/d1uAiYgcUpTpPp4ONG49
IAouBHq3h+o4nVvNfB0J8gaCtTsTZqi1Wt8WYs3XjxGJaKh//ealfRe1kuv40CWQ
8gjaC8/1w9pHdom3Wi/RwfDM3+dVGV6M5lAbPXMB4RK17Hk9P3hlJxJOpKBdgcBJ
PXtNrTwfqEWWxk2mB/YVyB84AxjkkNoYyi2ggQIDAQABAoIBAFgVasxTf3aaXbNo
7JzXMWb7W4iAG2GRNmZZzHA7hTSKFvS7jc3SX3n6WvDtEvlOi8ay2RyRNgEjBDP6
VZ/w2jUJjS5k7dN0Qb9nhPr5B9fS/0CAppcVfsx5/KEVFzniWOPyzQYyW7FJKu8h
4G5hrp/Ie4UH5tKtB6YUZB/wliyyQUkAZdBcoy1hfkOZLAXb1oofArKsiQUHIRA5
th1yyS4cZP8Upngd1EE+d95dFHM2F6iI2lj6DHuu+JxUZ+wKXoNimdG7JniRtIf4
56GoDov83Ey+XbIS6FSQc9nY0ijBDcubl/yP3roCQpE+MZ9BNEo5uj7YmCtAMYLW
TXTNBGUCgYEA4wdkH1NLdub2NcpqwmSA0AtbRvDkt0XTDWWwmuMr/+xPVa4sUKHs
80THQEX/WAZroP6IPbMP6BJhzb53vECukgC65qPxu6M9D1lBGtglxgen4AMu1bKK
gnM8onwARGIo/2ay6qRRZZCxg0TvBky3hbTcIM2zVrnKU6VVyGKHSV8CgYEAygxs
WQYrACv3XN6ZEzyxy08JgjbcnkPWK/m3VPcyHgdEkDu8+nDdUVdbF/js2JWMMx5g
vrPhZ7jVLOXGcLr5mVU4dG5tW5lU0bMy+YYxpEQDiBKlpXgfOsQnakHj7cCZ6bay
mKjJck2oEAQS9bqOJN/Ts5vhOmc8rmhkO7hnAh8CgYEArhVDy9Vl/1WYo6SD+m1w
bJbYtewPpQzwicxZAFuDqKk+KDf3GRkhBWTO2FUUOB4sN3YVaCI+5zf5MPeE/qAm
fCP9LM+3k6bXMkbBamEljdTfACHQruJJ3T+Z1gn5dnZCc5z/QncfRx8NTtfz5MO8
0dTeGnVAuBacs0kLHy2WCUcCgYALNBkl7pOf1NBIlAdE686oCV/rmoMtO3G6yoQB
8BsVUy3YGZfnAy8ifYeNkr3/XHuDsiGHMY5EJBmd/be9NID2oaUZv63MsHnljtw6
vdgu1Z6kgvQwcrK4nXvaBoFPA6kFLp5EnMde0TOKf89VVNzg6pBgmzon9OWGfj9g
mF8N3QKBgQCeoLwxUxpzEA0CPHm7DWF0LefVGllgZ23Eqncdy0QRku5zwwibszbL
sWaR3uDCc3oYcbSGCDVx3cSkvMAJNalc5ZHPfoV9W0+v392/rrExo5iwD8CSoCb2
gFWkeR7PBrD3NzFzFAWyiudzhBKHfRsB0MpCXbJV/WLqTlGIbEypjg==
-----END RSA PRIVATE KEY-----

View File

@@ -23,6 +23,7 @@ async def app_info_dump(app: Sanic, _):
"access_log": app.config.ACCESS_LOG,
"auto_reload": app.auto_reload,
"debug": app.debug,
"noisy_exceptions": app.config.NOISY_EXCEPTIONS,
}
logger.info(json.dumps(app_data))

View File

@@ -45,6 +45,58 @@ def test_server_run(appname):
assert firstline == b"Goin' Fast @ http://127.0.0.1:8000"
@pytest.mark.parametrize(
"cmd",
(
(
"--cert=certs/sanic.example/fullchain.pem",
"--key=certs/sanic.example/privkey.pem",
),
(
"--tls=certs/sanic.example/",
"--tls=certs/localhost/",
),
(
"--tls=certs/sanic.example/",
"--tls=certs/localhost/",
"--tls-strict-host",
),
),
)
def test_tls_options(cmd):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode != 1
lines = out.split(b"\n")
firstline = lines[6]
assert firstline == b"Goin' Fast @ https://127.0.0.1:9999"
@pytest.mark.parametrize(
"cmd",
(
(
"--cert=certs/sanic.example/fullchain.pem",
),
(
"--cert=certs/sanic.example/fullchain.pem",
"--key=certs/sanic.example/privkey.pem",
"--tls=certs/localhost/",
),
(
"--tls-strict-host",
),
),
)
def test_tls_wrong_options(cmd):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode == 1
assert not out
errmsg = err.decode().split("sanic: error: ")[1].split("\n")[0]
assert errmsg == "TLS certificates must be specified by either of:"
@pytest.mark.parametrize(
"cmd",
(
@@ -182,3 +234,21 @@ def test_version(cmd):
version_string = f"Sanic {__version__}; Routing {__routing_version__}\n"
assert out == version_string.encode("utf-8")
@pytest.mark.parametrize(
"cmd,expected",
(
("--noisy-exceptions", True),
("--no-noisy-exceptions", False),
),
)
def test_noisy_exceptions(cmd, expected):
command = ["sanic", "fake.server.app", cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
app_info = lines[26]
info = json.loads(app_info)
assert info["noisy_exceptions"] is expected

View File

@@ -2,10 +2,11 @@ import asyncio
import logging
import pytest
from unittest.mock import Mock
from bs4 import BeautifulSoup
from sanic import Sanic
from sanic import Sanic, handlers
from sanic.exceptions import Forbidden, InvalidUsage, NotFound, ServerError
from sanic.handlers import ErrorHandler
from sanic.response import stream, text
@@ -227,3 +228,18 @@ def test_single_arg_exception_handler_notice(exception_handler_app, caplog):
"v22.3, the legacy style lookup method will not work at all."
)
assert response.status == 400
def test_error_handler_noisy_log(exception_handler_app, monkeypatch):
err_logger = Mock()
monkeypatch.setattr(handlers, "error_logger", err_logger)
exception_handler_app.config["NOISY_EXCEPTIONS"] = False
exception_handler_app.test_client.get("/1")
err_logger.exception.assert_not_called()
exception_handler_app.config["NOISY_EXCEPTIONS"] = True
request, _ = exception_handler_app.test_client.get("/1")
err_logger.exception.assert_called_with(
"Exception occurred while handling uri: %s", repr(request.url)
)

View File

@@ -1,6 +1,4 @@
import logging
import os
import sys
import uuid
from importlib import reload
@@ -9,12 +7,9 @@ from unittest.mock import Mock
import pytest
from sanic_testing.testing import SanicTestClient
import sanic
from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.log import LOGGING_CONFIG_DEFAULTS, logger
from sanic.response import text
@@ -155,56 +150,6 @@ async def test_logger(caplog):
assert record in caplog.record_tuples
@pytest.mark.skipif(
OS_IS_WINDOWS and sys.version_info >= (3, 8),
reason="Not testable with current client",
)
def test_logger_static_and_secure(caplog):
# Same as test_logger, except for more coverage:
# - test_client initialised separately for static port
# - using ssl
rand_string = str(uuid.uuid4())
app = Sanic(name=__name__)
@app.get("/")
def log_info(request):
logger.info(rand_string)
return text("hello")
current_dir = os.path.dirname(os.path.realpath(__file__))
ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
ssl_dict = {"cert": ssl_cert, "key": ssl_key}
test_client = SanicTestClient(app, port=42101)
with caplog.at_level(logging.INFO):
request, response = test_client.get(
f"https://127.0.0.1:{test_client.port}/",
server_kwargs=dict(ssl=ssl_dict),
)
port = test_client.port
assert caplog.record_tuples[0] == (
"sanic.root",
logging.INFO,
f"Goin' Fast @ https://127.0.0.1:{port}",
)
assert caplog.record_tuples[1] == (
"sanic.root",
logging.INFO,
f"https://127.0.0.1:{port}/",
)
assert caplog.record_tuples[2] == ("sanic.root", logging.INFO, rand_string)
assert caplog.record_tuples[-1] == (
"sanic.root",
logging.INFO,
"Server Stopped",
)
def test_logging_modified_root_logger_config():
# reset_logging()

View File

@@ -1,6 +1,4 @@
import logging
import os
import ssl
from json import dumps as json_dumps
from json import loads as json_loads
@@ -1119,92 +1117,6 @@ async def test_url_attributes_no_ssl_asgi(app, path, query, expected_url):
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_context(app, path, query, expected_url):
current_dir = os.path.dirname(os.path.realpath(__file__))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
os.path.join(current_dir, "certs/selfsigned.cert"),
keyfile=os.path.join(current_dir, "certs/selfsigned.key"),
)
async def handler(request):
return text("OK")
app.add_route(handler, path)
port = app.test_client.port
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": context},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
current_dir = os.path.dirname(os.path.realpath(__file__))
ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
ssl_dict = {"cert": ssl_cert, "key": ssl_key}
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": ssl_dict},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_invalid_ssl_dict(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_dict = {"cert": None, "key": None}
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_dict}
)
assert str(excinfo.value) == "SSLContext or certificate and key required."
def test_form_with_multiple_values(app):
@app.route("/", methods=["POST"])
async def handler(request):

378
tests/test_tls.py Normal file
View File

@@ -0,0 +1,378 @@
import logging
import os
import ssl
import uuid
from contextlib import contextmanager
from urllib.parse import urlparse
import pytest
from sanic_testing.testing import HOST, PORT, SanicTestClient
from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.log import logger
from sanic.response import text
current_dir = os.path.dirname(os.path.realpath(__file__))
localhost_dir = os.path.join(current_dir, "certs/localhost")
sanic_dir = os.path.join(current_dir, "certs/sanic.example")
invalid_dir = os.path.join(current_dir, "certs/invalid.nonexist")
localhost_cert = os.path.join(localhost_dir, "fullchain.pem")
localhost_key = os.path.join(localhost_dir, "privkey.pem")
sanic_cert = os.path.join(sanic_dir, "fullchain.pem")
sanic_key = os.path.join(sanic_dir, "privkey.pem")
@contextmanager
def replace_server_name(hostname):
"""Temporarily replace the server name sent with all TLS requests with a fake hostname."""
def hack_wrap_bio(
self,
incoming,
outgoing,
server_side=False,
server_hostname=None,
session=None,
):
return orig_wrap_bio(
self, incoming, outgoing, server_side, hostname, session
)
orig_wrap_bio, ssl.SSLContext.wrap_bio = (
ssl.SSLContext.wrap_bio,
hack_wrap_bio,
)
try:
yield
finally:
ssl.SSLContext.wrap_bio = orig_wrap_bio
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_context(app, path, query, expected_url):
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(localhost_cert, localhost_key)
async def handler(request):
return text("OK")
app.add_route(handler, path)
port = app.test_client.port
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": context},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
@pytest.mark.parametrize(
"path,query,expected_url",
[
("/foo", "", "https://{}:{}/foo"),
("/bar/baz", "", "https://{}:{}/bar/baz"),
("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
],
)
def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
async def handler(request):
return text("OK")
app.add_route(handler, path)
request, response = app.test_client.get(
f"https://{HOST}:{PORT}" + path + f"?{query}",
server_kwargs={"ssl": ssl_dict},
)
assert request.url == expected_url.format(HOST, request.server_port)
parsed = urlparse(request.url)
assert parsed.scheme == request.scheme
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_cert_sni_single(app):
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
@app.get("/commonname")
async def handler(request):
return text(request.conn_info.cert.get("commonName"))
port = app.test_client.port
request, response = app.test_client.get(
f"https://localhost:{port}/sni",
server_kwargs={"ssl": localhost_dir},
)
assert response.status == 200
assert response.text == "localhost"
request, response = app.test_client.get(
f"https://localhost:{port}/commonname",
server_kwargs={"ssl": localhost_dir},
)
assert response.status == 200
assert response.text == "localhost"
def test_cert_sni_list(app):
ssl_list = [sanic_dir, localhost_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
@app.get("/commonname")
async def handler(request):
return text(request.conn_info.cert.get("commonName"))
# This test should match the localhost cert
port = app.test_client.port
request, response = app.test_client.get(
f"https://localhost:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "localhost"
request, response = app.test_client.get(
f"https://localhost:{port}/commonname",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "localhost"
# This part should use the sanic.example cert because it matches
with replace_server_name("www.sanic.example"):
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "www.sanic.example"
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/commonname",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "sanic.example"
# This part should use the sanic.example cert, that being the first listed
with replace_server_name("invalid.test"):
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "invalid.test"
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/commonname",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "sanic.example"
def test_missing_sni(app):
"""The sanic cert does not list 127.0.0.1 and httpx does not send IP as SNI anyway."""
ssl_list = [None, sanic_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
port = app.test_client.port
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
def test_no_matching_cert(app):
"""The sanic cert does not list 127.0.0.1 and httpx does not send IP as SNI anyway."""
ssl_list = [None, sanic_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
port = app.test_client.port
with replace_server_name("invalid.test"):
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
def test_wildcards(app):
ssl_list = [None, localhost_dir, sanic_dir]
@app.get("/sni")
async def handler(request):
return text(request.conn_info.server_name)
port = app.test_client.port
with replace_server_name("foo.sanic.test"):
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert response.status == 200
assert response.text == "foo.sanic.test"
with replace_server_name("sanic.test"):
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
with replace_server_name("sub.foo.sanic.test"):
with pytest.raises(Exception) as exc:
request, response = app.test_client.get(
f"https://127.0.0.1:{port}/sni",
server_kwargs={"ssl": ssl_list},
)
assert "Request and response object expected" in str(exc.value)
def test_invalid_ssl_dict(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_dict = {"cert": None, "key": None}
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_dict}
)
assert str(excinfo.value) == "SSL dict needs filenames for cert and key."
def test_invalid_ssl_type(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": False}
)
assert "Invalid ssl argument" in str(excinfo.value)
def test_cert_file_on_pathlist(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_list = [sanic_cert]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "folder expected" in str(excinfo.value)
assert sanic_cert in str(excinfo.value)
def test_missing_cert_path(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_list = [invalid_dir]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "not found" in str(excinfo.value)
assert invalid_dir + "/privkey.pem" in str(excinfo.value)
def test_missing_cert_file(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
invalid2 = invalid_dir.replace("nonexist", "certmissing")
ssl_list = [invalid2]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "not found" in str(excinfo.value)
assert invalid2 + "/fullchain.pem" in str(excinfo.value)
def test_no_certs_on_list(app):
@app.get("/test")
async def handler(request):
return text("ssl test")
ssl_list = [None]
with pytest.raises(ValueError) as excinfo:
request, response = app.test_client.get(
"/test", server_kwargs={"ssl": ssl_list}
)
assert "No certificates" in str(excinfo.value)
def test_logger_vhosts(caplog):
app = Sanic(name=__name__)
@app.after_server_start
def stop(*args):
app.stop()
with caplog.at_level(logging.INFO):
app.run(host="127.0.0.1", port=42102, ssl=[localhost_dir, sanic_dir])
logmsg = [
m for s, l, m in caplog.record_tuples if m.startswith("Certificate")
][0]
assert logmsg == (
"Certificate vhosts: localhost, 127.0.0.1, 0:0:0:0:0:0:0:1, sanic.example, www.sanic.example, *.sanic.test, 2001:DB8:0:0:0:0:0:541C"
)