Compare commits
3 Commits
tlspath
...
response-e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8027563144 | ||
|
|
875e921bda | ||
|
|
11c841ab4e |
@@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
import os
|
||||
import ssl
|
||||
|
||||
from pathlib import Path, PurePath
|
||||
from typing import Any, Dict, Iterable, Optional, Union
|
||||
|
||||
from sanic.log import logger
|
||||
@@ -40,23 +39,23 @@ def create_context(
|
||||
|
||||
|
||||
def shorthand_to_ctx(
|
||||
ctxdef: Union[None, ssl.SSLContext, dict, PurePath, str]
|
||||
ctxdef: Union[None, ssl.SSLContext, dict, str]
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
"""Convert an ssl argument shorthand to an SSLContext object."""
|
||||
if ctxdef is None or isinstance(ctxdef, ssl.SSLContext):
|
||||
return ctxdef
|
||||
if isinstance(ctxdef, (PurePath, str)):
|
||||
return load_cert_dir(Path(ctxdef))
|
||||
if isinstance(ctxdef, str):
|
||||
return load_cert_dir(ctxdef)
|
||||
if isinstance(ctxdef, dict):
|
||||
return CertSimple(**ctxdef)
|
||||
raise ValueError(
|
||||
f"Invalid ssl argument {type(ctxdef)}."
|
||||
" Expecting one/list of: certdir | dict | SSLContext"
|
||||
" Expecting a list of certdirs, a dict or an SSLContext."
|
||||
)
|
||||
|
||||
|
||||
def process_to_context(
|
||||
ssldef: Union[None, ssl.SSLContext, dict, PurePath, str, list, tuple]
|
||||
ssldef: Union[None, ssl.SSLContext, dict, str, list, tuple]
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
"""Process app.run ssl argument from easy formats to full SSLContext."""
|
||||
return (
|
||||
@@ -66,11 +65,11 @@ def process_to_context(
|
||||
)
|
||||
|
||||
|
||||
def load_cert_dir(p: Path) -> ssl.SSLContext:
|
||||
if p.is_file():
|
||||
def load_cert_dir(p: str) -> ssl.SSLContext:
|
||||
if os.path.isfile(p):
|
||||
raise ValueError(f"Certificate folder expected but {p} is a file.")
|
||||
keyfile = p / "privkey.pem"
|
||||
certfile = p / "fullchain.pem"
|
||||
keyfile = os.path.join(p, "privkey.pem")
|
||||
certfile = os.path.join(p, "fullchain.pem")
|
||||
if not os.access(keyfile, os.R_OK):
|
||||
raise ValueError(
|
||||
f"Certificate not found or permission denied {keyfile}"
|
||||
|
||||
@@ -24,7 +24,6 @@ from sanic.helpers import (
|
||||
Default,
|
||||
_default,
|
||||
has_message_body,
|
||||
remove_entity_headers,
|
||||
)
|
||||
from sanic.http import Http
|
||||
|
||||
@@ -106,9 +105,6 @@ class BaseHTTPResponse:
|
||||
Returns:
|
||||
Iterator[Tuple[bytes, bytes]]: A list of header tuples encoded in bytes for sending
|
||||
""" # noqa: E501
|
||||
# TODO: Make a blacklist set of header names and then filter with that
|
||||
if self.status in (304, 412): # Not Modified, Precondition Failed
|
||||
self.headers = remove_entity_headers(self.headers)
|
||||
if has_message_body(self.status):
|
||||
self.headers.setdefault("content-type", self.content_type)
|
||||
# Encode headers into bytes
|
||||
|
||||
@@ -178,6 +178,10 @@ def json_app(app):
|
||||
async def unmodified_handler(request: Request):
|
||||
return json(JSON_DATA, status=304)
|
||||
|
||||
@app.get("/precondition")
|
||||
async def precondition_handler(request: Request):
|
||||
return json(JSON_DATA, status=412)
|
||||
|
||||
@app.delete("/")
|
||||
async def delete_handler(request: Request):
|
||||
return json(None, status=204)
|
||||
@@ -193,6 +197,10 @@ def test_json_response(json_app):
|
||||
assert response.text == json_dumps(JSON_DATA)
|
||||
assert response.json == JSON_DATA
|
||||
|
||||
request, response = json_app.test_client.get("/precondition")
|
||||
assert response.status == 412
|
||||
assert response.json == JSON_DATA
|
||||
|
||||
|
||||
def test_no_content(json_app):
|
||||
request, response = json_app.test_client.get("/no-content")
|
||||
|
||||
Reference in New Issue
Block a user