Compare commits

..

16 Commits

Author SHA1 Message Date
L. Kärkkäinen
fd2e4819d1 Merge branch 'main' into accept-enhance 2023-02-05 18:53:34 +00:00
L. Karkkainen
0e024b46d9 black 2023-02-05 16:40:04 +00:00
L. Karkkainen
eae58e5d2a Minor cleanup. 2023-02-05 16:37:14 +00:00
L. Karkkainen
6472a69fbf More specific naming: mime is simple str, media_type may have q and raw is header component. 2023-02-05 16:28:32 +00:00
L. Karkkainen
2e2231919c Updated/removed tests due toe accept/mediatype complete API and semantics change. 2023-01-30 02:24:12 +00:00
L. Karkkainen
8da10a9c0c Compatibility with older version. 2023-01-30 02:23:26 +00:00
L. Karkkainen
ec25581262 Accept header choose() function removed and replaced by a more versatile match(). 2023-01-30 01:04:14 +00:00
L. Karkkainen
b8ae4285a4 Move all errorpages work to another branch error-format-redux. 2023-01-29 03:11:27 +00:00
L. Karkkainen
c0ca55530e Add back JSON detection by request body, but to be deprecated. 2023-01-29 03:03:26 +00:00
L. Karkkainen
52ecbb9dc7 Note that base renderer can be changed. 2023-01-29 03:01:26 +00:00
L. Karkkainen
3ef99568a5 Refactor acceptable check to a helper function. 2023-01-29 03:00:04 +00:00
L. Karkkainen
dfe2148333 Remove dubious or unnecessary handler types of response mapping. 2023-01-29 01:59:24 +00:00
L. Karkkainen
7909f673e5 Handle empty/missing accept header more directly 2023-01-29 01:52:53 +00:00
L. Karkkainen
e35286e332 Rethinking of renderer selection logic, cleanup. 2023-01-29 01:43:40 +00:00
L. Karkkainen
8eeb1c20dc Unfinished hacks, moving to another machine. 2023-01-29 00:04:39 +00:00
Adam Hopkins
43c9a0a49b Additional accept functionality 2023-01-25 00:13:44 +02:00
29 changed files with 1571 additions and 2158 deletions

View File

@@ -24,6 +24,5 @@ module = [
"sanic_routing.*",
"aioquic.*",
"html5tagger.*",
"tracerite.*",
]
ignore_missing_imports = true

View File

@@ -1,10 +1 @@
__version__ = "23.3.0"
__compatibility__ = "22.12"
from inspect import currentframe, stack
for frame_info in stack():
if frame_info.frame is not currentframe():
value = frame_info.frame.f_globals.get("__SANIC_COMPATIBILITY__")
if value:
__compatibility__ = value
__version__ = "22.12.0"

View File

@@ -875,8 +875,6 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
:param request: HTTP Request object
:return: Nothing
"""
__tracebackhide__ = True
await self.dispatch(
"http.lifecycle.handle",
inline=True,

View File

@@ -40,7 +40,7 @@ FULL_COLOR_LOGO = """
""" # noqa
SVG_LOGO_SIMPLE = """<svg id=logo-simple viewBox="0 0 964 279"><desc>Sanic</desc><path d="M107 222c9-2 10-20 1-22s-20-2-30-2-17 7-16 14 6 10 15 10h30zm115-1c16-2 30-11 35-23s6-24 2-33-6-14-15-20-24-11-38-10c-7 3-10 13-5 19s17-1 24 4 15 14 13 24-5 15-14 18-50 0-74 0h-17c-6 4-10 15-4 20s16 2 23 3zM251 83q9-1 9-7 0-15-10-16h-13c-10 6-10 20 0 22zM147 60c-4 0-10 3-11 11s5 13 10 12 42 0 67 0c5-3 7-10 6-15s-4-8-9-8zm-33 1c-8 0-16 0-24 3s-20 10-25 20-6 24-4 36 15 22 26 27 78 8 94 3c4-4 4-12 0-18s-69 8-93-10c-8-7-9-23 0-30s12-10 20-10 12 2 16-3 1-15-5-18z" fill="#ff0d68"/><path d="M676 74c0-14-18-9-20 0s0 30 0 39 20 9 20 2zm-297-10c-12 2-15 12-23 23l-41 58H340l22-30c8-12 23-13 30-4s20 24 24 38-10 10-17 10l-68 2q-17 1-48 30c-7 6-10 20 0 24s15-8 20-13 20 -20 58-21h50 c20 2 33 9 52 30 8 10 24-4 16-13L384 65q-3-2-5-1zm131 0c-10 1-12 12-11 20v96c1 10-3 23 5 32s20-5 17-15c0-23-3-46 2-67 5-12 22-14 32-5l103 87c7 5 19 1 18-9v-64c-3-10-20-9-21 2s-20 22-30 13l-97-80c-5-4-10-10-18-10zM701 76v128c2 10 15 12 20 4s0-102 0-124s-20-18-20-7z M850 63c-35 0-69-2-86 15s-20 60-13 66 13 8 16 0 1-10 1-27 12-26 20-32 66-5 85-5 31 4 31-10-18-7-54-7M764 159c-6-2-15-2-16 12s19 37 33 43 23 8 25-4-4-11-11-14q-9-3-22-18c-4-7-3-16-10-19zM828 196c-4 0-8 1-10 5s-4 12 0 15 8 2 12 2h60c5 0 10-2 12-6 3-7-1-16-8-16z" fill="#1f1f1f"/></svg>""" # noqa
SVG_LOGO = """<svg id=logo alt=Sanic viewBox="0 0 964 279"><path d="M107 222c9-2 10-20 1-22s-20-2-30-2-17 7-16 14 6 10 15 10h30zm115-1c16-2 30-11 35-23s6-24 2-33-6-14-15-20-24-11-38-10c-7 3-10 13-5 19s17-1 24 4 15 14 13 24-5 15-14 18-50 0-74 0h-17c-6 4-10 15-4 20s16 2 23 3zM251 83q9-1 9-7 0-15-10-16h-13c-10 6-10 20 0 22zM147 60c-4 0-10 3-11 11s5 13 10 12 42 0 67 0c5-3 7-10 6-15s-4-8-9-8zm-33 1c-8 0-16 0-24 3s-20 10-25 20-6 24-4 36 15 22 26 27 78 8 94 3c4-4 4-12 0-18s-69 8-93-10c-8-7-9-23 0-30s12-10 20-10 12 2 16-3 1-15-5-18z" fill="#ff0d68"/><path d="M676 74c0-14-18-9-20 0s0 30 0 39 20 9 20 2zm-297-10c-12 2-15 12-23 23l-41 58H340l22-30c8-12 23-13 30-4s20 24 24 38-10 10-17 10l-68 2q-17 1-48 30c-7 6-10 20 0 24s15-8 20-13 20 -20 58-21h50 c20 2 33 9 52 30 8 10 24-4 16-13L384 65q-3-2-5-1zm131 0c-10 1-12 12-11 20v96c1 10-3 23 5 32s20-5 17-15c0-23-3-46 2-67 5-12 22-14 32-5l103 87c7 5 19 1 18-9v-64c-3-10-20-9-21 2s-20 22-30 13l-97-80c-5-4-10-10-18-10zM701 76v128c2 10 15 12 20 4s0-102 0-124s-20-18-20-7z M850 63c-35 0-69-2-86 15s-20 60-13 66 13 8 16 0 1-10 1-27 12-26 20-32 66-5 85-5 31 4 31-10-18-7-54-7M764 159c-6-2-15-2-16 12s19 37 33 43 23 8 25-4-4-11-11-14q-9-3-22-18c-4-7-3-16-10-19zM828 196c-4 0-8 1-10 5s-4 12 0 15 8 2 12 2h60c5 0 10-2 12-6 3-7-1-16-8-16z" fill="#e1e1e1"/></svg>""" # noqa
ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

View File

@@ -1,18 +0,0 @@
from sanic.__version__ import __compatibility__
if __compatibility__ == "22.12":
from .v22_12.request import (
File,
Request,
RequestParameters,
parse_multipart_form,
)
elif __compatibility__ == "23.3":
from .v23_3.request import (
File,
Request,
RequestParameters,
parse_multipart_form,
)
else:
raise RuntimeError(f"Unknown compatibility value: {__compatibility__}")

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +0,0 @@
from ..v22_12.request import File
from ..v22_12.request import Request as LegacyRequest
from ..v22_12.request import RequestParameters, parse_multipart_form
class Request(LegacyRequest):
@property
def something_new(self):
return 123

View File

@@ -88,12 +88,6 @@ class Header(CIMultiDict):
very similar to a regular dictionary.
"""
def __getattr__(self, key: str) -> str:
if key.startswith("_"):
return self.__getattribute__(key)
key = key.rstrip("_").replace("_", "-")
return ",".join(self.getall(key, default=[]))
def get_all(self, key: str):
"""
Convenience method mapped to ``getall()``.

View File

@@ -22,8 +22,6 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES
from sanic.log import deprecation, logger
from sanic.pages.error import ErrorPage
from sanic.response import html, json, text
@@ -39,11 +37,11 @@ if t.TYPE_CHECKING:
from sanic import HTTPResponse, Request
DEFAULT_FORMAT = "auto"
FALLBACK_TEXT = """\
The application encountered an unexpected error and could not continue.\
"""
FALLBACK_TEXT = (
"The server encountered an internal error and "
"cannot complete your request."
)
FALLBACK_STATUS = 500
JSON = "application/json"
class BaseRenderer:
@@ -117,18 +115,134 @@ class HTMLRenderer(BaseRenderer):
The default fallback type.
"""
TRACEBACK_STYLE = """
html { font-family: sans-serif }
h2 { color: #888; }
.tb-wrapper p, dl, dd { margin: 0 }
.frame-border { margin: 1rem }
.frame-line > *, dt, dd { padding: 0.3rem 0.6rem }
.frame-line, dl { margin-bottom: 0.3rem }
.frame-code, dd { font-size: 16px; padding-left: 4ch }
.tb-wrapper, dl { border: 1px solid #eee }
.tb-header,.obj-header {
background: #eee; padding: 0.3rem; font-weight: bold
}
.frame-descriptor, dt { background: #e2eafb; font-size: 14px }
"""
TRACEBACK_WRAPPER_HTML = (
"<div class=tb-header>{exc_name}: {exc_value}</div>"
"<div class=tb-wrapper>{frame_html}</div>"
)
TRACEBACK_BORDER = (
"<div class=frame-border>"
"The above exception was the direct cause of the following exception:"
"</div>"
)
TRACEBACK_LINE_HTML = (
"<div class=frame-line>"
"<p class=frame-descriptor>"
"File {0.filename}, line <i>{0.lineno}</i>, "
"in <code><b>{0.name}</b></code>"
"<p class=frame-code><code>{0.line}</code>"
"</div>"
)
OBJECT_WRAPPER_HTML = (
"<div class=obj-header>{title}</div>"
"<dl class={obj_type}>{display_html}</dl>"
)
OBJECT_DISPLAY_HTML = "<dt>{key}</dt><dd><code>{value}</code></dd>"
OUTPUT_HTML = (
"<!DOCTYPE html><html lang=en>"
"<meta charset=UTF-8><title>{title}</title>\n"
"<style>{style}</style>\n"
"<h1>{title}</h1><p>{text}\n"
"{body}"
)
def full(self) -> HTTPResponse:
page = ErrorPage(
debug=self.debug,
title=super().title,
text=super().text,
request=self.request,
exc=self.exception,
return html(
self.OUTPUT_HTML.format(
title=self.title,
text=self.text,
style=self.TRACEBACK_STYLE,
body=self._generate_body(full=True),
),
status=self.status,
)
return html(page.render(), status=self.status, headers=self.headers)
def minimal(self) -> HTTPResponse:
return self.full()
return html(
self.OUTPUT_HTML.format(
title=self.title,
text=self.text,
style=self.TRACEBACK_STYLE,
body=self._generate_body(full=False),
),
status=self.status,
headers=self.headers,
)
@property
def text(self):
return escape(super().text)
@property
def title(self):
return escape(f"⚠️ {super().title}")
def _generate_body(self, *, full):
lines = []
if full:
_, exc_value, __ = sys.exc_info()
exceptions = []
while exc_value:
exceptions.append(self._format_exc(exc_value))
exc_value = exc_value.__cause__
traceback_html = self.TRACEBACK_BORDER.join(reversed(exceptions))
appname = escape(self.request.app.name)
name = escape(self.exception.__class__.__name__)
value = escape(self.exception)
path = escape(self.request.path)
lines += [
f"<h2>Traceback of {appname} " "(most recent call last):</h2>",
f"{traceback_html}",
"<div class=summary><p>",
f"<b>{name}: {value}</b> "
f"while handling path <code>{path}</code>",
"</div>",
]
for attr, display in (("context", True), ("extra", bool(full))):
info = getattr(self.exception, attr, None)
if info and display:
lines.append(self._generate_object_display(info, attr))
return "\n".join(lines)
def _generate_object_display(
self, obj: t.Dict[str, t.Any], descriptor: str
) -> str:
display = "".join(
self.OBJECT_DISPLAY_HTML.format(key=key, value=value)
for key, value in obj.items()
)
return self.OBJECT_WRAPPER_HTML.format(
title=descriptor.title(),
display_html=display,
obj_type=descriptor.lower(),
)
def _format_exc(self, exc):
frames = extract_tb(exc.__traceback__)
frame_html = "".join(
self.TRACEBACK_LINE_HTML.format(frame) for frame in frames
)
return self.TRACEBACK_WRAPPER_HTML.format(
exc_name=escape(exc.__class__.__name__),
exc_value=escape(exc),
frame_html=frame_html,
)
class TextRenderer(BaseRenderer):
@@ -276,18 +390,21 @@ def escape(text):
return f"{text}".replace("&", "&amp;").replace("<", "&lt;")
MIME_BY_CONFIG = {
"text": "text/plain",
"json": "application/json",
"html": "text/html",
RENDERERS_BY_CONFIG = {
"html": HTMLRenderer,
"json": JSONRenderer,
"text": TextRenderer,
}
CONFIG_BY_MIME = {v: k for k, v in MIME_BY_CONFIG.items()}
RENDERERS_BY_CONTENT_TYPE = {
"text/plain": TextRenderer,
"application/json": JSONRenderer,
"multipart/form-data": HTMLRenderer,
"text/html": HTMLRenderer,
}
CONTENT_TYPE_BY_RENDERERS = {
v: k for k, v in RENDERERS_BY_CONTENT_TYPE.items()
}
# Handler source code is checked for which response types it returns with the
# route error_format="auto" (default) to determine which format to use.
@@ -303,7 +420,7 @@ RESPONSE_MAPPING = {
def check_error_format(format):
if format not in MIME_BY_CONFIG and format != "auto":
if format not in RENDERERS_BY_CONFIG and format != "auto":
raise SanicException(f"Unknown format: {format}")
@@ -318,68 +435,98 @@ def exception_response(
"""
Render a response for the default FALLBACK exception handler.
"""
content_type = None
if not renderer:
mt = guess_mime(request, fallback)
renderer = RENDERERS_BY_CONTENT_TYPE.get(mt, base)
# Make sure we have something set
renderer = base
render_format = fallback
if request:
# If there is a request, try and get the format
# from the route
if request.route:
try:
if request.route.extra.error_format:
render_format = request.route.extra.error_format
except AttributeError:
...
content_type = request.headers.getone("content-type", "").split(
";"
)[0]
acceptable = request.accept
# If the format is auto still, make a guess
if render_format == "auto":
# First, if there is an Accept header, check if text/html
# is the first option
# According to MDN Web Docs, all major browsers use text/html
# as the primary value in Accept (with the exception of IE 8,
# and, well, if you are supporting IE 8, then you have bigger
# problems to concern yourself with than what default exception
# renderer is used)
# Source:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Content_negotiation/List_of_default_Accept_values
if acceptable and acceptable[0].match(
"text/html",
allow_type_wildcard=False,
allow_subtype_wildcard=False,
):
renderer = HTMLRenderer
# Second, if there is an Accept header, check if
# application/json is an option, or if the content-type
# is application/json
elif (
acceptable
and acceptable.match(
"application/json",
allow_type_wildcard=False,
allow_subtype_wildcard=False,
)
or content_type == "application/json"
):
renderer = JSONRenderer
# Third, if there is no Accept header, assume we want text.
# The likely use case here is a raw socket.
elif not acceptable:
renderer = TextRenderer
else:
# Fourth, look to see if there was a JSON body
# When in this situation, the request is probably coming
# from curl, an API client like Postman or Insomnia, or a
# package like requests or httpx
try:
# Give them the benefit of the doubt if they did:
# $ curl localhost:8000 -d '{"foo": "bar"}'
# And provide them with JSONRenderer
renderer = JSONRenderer if request.json else base
except BadRequest:
renderer = base
else:
renderer = RENDERERS_BY_CONFIG.get(render_format, renderer)
# Lastly, if there is an Accept header, make sure
# our choice is okay
if acceptable:
type_ = CONTENT_TYPE_BY_RENDERERS.get(renderer) # type: ignore
if type_ and type_ not in acceptable:
# If the renderer selected is not in the Accept header
# look through what is in the Accept header, and select
# the first option that matches. Otherwise, just drop back
# to the original default
for accept in acceptable:
mtype = f"{accept.type_}/{accept.subtype}"
maybe = RENDERERS_BY_CONTENT_TYPE.get(mtype)
if maybe:
renderer = maybe
break
else:
renderer = base
renderer = t.cast(t.Type[BaseRenderer], renderer)
return renderer(request, exception, debug).render()
def guess_mime(req: Request, fallback: str) -> str:
# Attempt to find a suitable MIME format for the response.
# Insertion-ordered map of formats["html"] = "source of that suggestion"
formats = {}
name = ""
# Route error_format (by magic from handler code if auto, the default)
if req.route:
name = req.route.name
f = req.route.extra.error_format
if f in MIME_BY_CONFIG:
formats[f] = name
if not formats and fallback in MIME_BY_CONFIG:
formats[fallback] = "FALLBACK_ERROR_FORMAT"
# If still not known, check for the request for clues of JSON
if not formats and fallback == "auto" and req.accept.match(JSON):
if JSON in req.accept: # Literally, not wildcard
formats["json"] = "request.accept"
elif JSON in req.headers.getone("content-type", ""):
formats["json"] = "content-type"
# DEPRECATION: Remove this block in 24.3
else:
c = None
try:
c = req.json
except BadRequest:
pass
if c:
formats["json"] = "request.json"
deprecation(
"Response type was determined by the JSON content of "
"the request. This behavior is deprecated and will be "
"removed in v24.3. Please specify the format either by\n"
f' error_format="json" on route {name}, by\n'
' FALLBACK_ERROR_FORMAT = "json", or by adding header\n'
" accept: application/json to your requests.",
24.3,
)
# Any other supported formats
if fallback == "auto":
for k in MIME_BY_CONFIG:
if k not in formats:
formats[k] = "any"
mimes = [MIME_BY_CONFIG[k] for k in formats]
m = req.accept.match(*mimes)
if m:
format = CONFIG_BY_MIME[m.mime]
source = formats[format]
logger.debug(
f"The client accepts {m.header}, using '{format}' from {source}"
)
else:
logger.debug(f"No format found, the client accepts {req.accept!r}")
return m.mime

View File

@@ -19,6 +19,7 @@ OptionsIterable = Iterable[Tuple[str, str]] # May contain duplicate keys
_token, _quoted = r"([\w!#$%&'*+\-.^_`|~]+)", r'"([^"]*)"'
_param = re.compile(rf";\s*{_token}=(?:{_token}|{_quoted})", re.ASCII)
_firefox_quote_escape = re.compile(r'\\"(?!; |\s*$)')
_ipv6 = "(?:[0-9A-Fa-f]{0,4}:){2,7}[0-9A-Fa-f]{0,4}"
_ipv6_re = re.compile(_ipv6)
_host_re = re.compile(
@@ -32,6 +33,15 @@ _host_re = re.compile(
# For more information, consult ../tests/test_requests.py
def parse_arg_as_accept(f):
def func(self, other, *args, **kwargs):
if not isinstance(other, MediaType) and other:
other = MediaType._parse(other)
return f(self, other, *args, **kwargs)
return func
class MediaType:
"""A media type, as used in the Accept header."""
@@ -41,67 +51,57 @@ class MediaType:
subtype: str,
**params: str,
):
self.type = type_
self.type_ = type_
self.subtype = subtype
self.q = float(params.get("q", "1.0"))
self.params = params
self.mime = f"{type_}/{subtype}"
self.key = (
-1 * self.q,
-1 * len(self.params),
self.subtype == "*",
self.type == "*",
)
def __repr__(self):
return self.mime + "".join(f";{k}={v}" for k, v in self.params.items())
def __eq__(self, other):
"""Check for mime (str or MediaType) identical type/subtype.
Parameters such as q are not considered."""
"""Check for mime (str or MediaType) identical type/subtype."""
if isinstance(other, str):
# Give a friendly reminder if str contains parameters
if ";" in other:
raise ValueError("Use match() to compare with parameters")
return self.mime == other
if isinstance(other, MediaType):
# Ignore parameters silently with MediaType objects
return self.mime == other.mime
return NotImplemented
def match(
self,
mime_with_params: Union[str, MediaType],
mime: str,
allow_type_wildcard=True,
allow_subtype_wildcard=True,
) -> Optional[MediaType]:
"""Check if this media type matches the given mime type/subtype.
Wildcards are supported both ways on both type and subtype.
If mime contains a semicolon, optionally followed by parameters,
the parameters of the two media types must match exactly.
Note: Use the `==` operator instead to check for literal matches
without expanding wildcards.
@param media_type: A type/subtype string to match.
@return `self` if the media types are compatible, else `None`
"""
mt = (
MediaType._parse(mime_with_params)
if isinstance(mime_with_params, str)
else mime_with_params
)
mt = MediaType._parse(mime)
return (
self
if (
mt
# All parameters given in the other media type must match
and all(self.params.get(k) == v for k, v in mt.params.items())
# Subtype match
and (
self.subtype == mt.subtype
or self.subtype == "*"
or mt.subtype == "*"
)
(self.subtype in (mt.subtype, "*") or mt.subtype == "*")
# Type match
and (self.type_ in (mt.type_, "*") or mt.type_ == "*")
# Allow disabling wildcards (backwards compatibility with tests)
and (
self.type == mt.type or self.type == "*" or mt.type == "*"
allow_type_wildcard
or self.type_ != "*"
and mt.type_ != "*"
)
and (
allow_subtype_wildcard
or self.subtype != "*"
and mt.subtype != "*"
)
)
else None
@@ -110,16 +110,19 @@ class MediaType:
@property
def has_wildcard(self) -> bool:
"""Return True if this media type has a wildcard in it."""
return any(part == "*" for part in (self.subtype, self.type))
return "*" in (self.subtype, self.type_)
@property
def is_wildcard(self) -> bool:
"""Return True if this is the wildcard `*/*`"""
return self.type_ == "*" and self.subtype == "*"
@classmethod
def _parse(cls, mime_with_params: str) -> Optional[MediaType]:
def _parse(cls, mime_with_params: str) -> MediaType:
mtype = mime_with_params.strip()
if "/" not in mime_with_params:
return None
mime, *raw_params = mtype.split(";")
type_, subtype = mime.split("/", 1)
media, *raw_params = mtype.split(";")
type_, subtype = media.split("/", 1)
if not type_ or not subtype:
raise ValueError(f"Invalid media type: {mtype}")
@@ -133,63 +136,17 @@ class MediaType:
return cls(type_.lstrip(), subtype.rstrip(), **params)
class Matched:
"""A matching result of a MIME string against a header."""
class Matched(str):
"""A matching result of a MIME string against a MediaType."""
def __init__(self, mime: str, header: Optional[MediaType]):
self.mime = mime
self.header = header
def __new__(cls, mime: str, m: Optional[MediaType]):
return super().__new__(cls, mime)
def __init__(self, mime: str, m: Optional[MediaType]):
self.m = m
def __repr__(self):
return f"<{self} matched {self.header}>" if self else "<no match>"
def __str__(self):
return self.mime
def __bool__(self):
return self.header is not None
def __eq__(self, other: Any) -> bool:
try:
comp, other_accept = self._compare(other)
except TypeError:
return False
return bool(
comp
and (
(self.header and other_accept.header)
or (not self.header and not other_accept.header)
)
)
def _compare(self, other) -> Tuple[bool, Matched]:
if isinstance(other, str):
parsed = Matched.parse(other)
if self.mime == other:
return True, parsed
other = parsed
if isinstance(other, Matched):
return self.header == other.header, other
raise TypeError(
"Comparison not supported between unequal "
f"mime types of '{self.mime}' and '{other}'"
)
def match(self, other: Union[str, Matched]) -> Optional[Matched]:
accept = Matched.parse(other) if isinstance(other, str) else other
if not self.header or not accept.header:
return None
if self.header.match(accept.header):
return accept
return None
@classmethod
def parse(cls, raw: str) -> Matched:
media_type = MediaType._parse(raw)
return cls(raw, media_type)
return f"<{self} matched {self.m}>" if self else "<no match>"
class AcceptList(list):
@@ -204,15 +161,14 @@ class AcceptList(list):
- operator 'in' for checking explicit matches (wildcards as literals)
"""
def match(self, *mimes: str, accept_wildcards=True) -> Matched:
def match(self, *mimes: str) -> Matched:
"""Find a media type accepted by the client.
This method can be used to find which of the media types requested by
the client is most preferred against the ones given as arguments.
The ordering of preference is set by:
1. The order set by RFC 7231, s. 5.3.2, giving a higher priority
to q values and more specific type definitions,
1. The q values on the Accept header, and those being equal,
2. The order of the arguments (first is most preferred), and
3. The first matching entry on the Accept header.
@@ -225,41 +181,29 @@ class AcceptList(list):
header entry `MediaType` or `None` is available as the `m` attribute.
@param mimes: Any MIME types to search for in order of preference.
@param accept_wildcards: Match Accept entries with wildcards in them.
@return A match object with the mime string and the MediaType object.
"""
a = sorted(
(-acc.q, i, j, mime, acc)
for j, acc in enumerate(self)
if accept_wildcards or not acc.has_wildcard
for i, mime in enumerate(mimes)
if acc.match(mime)
l = sorted(
[
(-acc.q, i, j, mime, acc) # Sort by -q, i, j
for j, acc in enumerate(self)
for i, mime in enumerate(mimes)
if acc.match(mime)
]
)
return Matched(*(a[0][-2:] if a else ("", None)))
def __str__(self):
"""Format as Accept header value (parsed, not original)."""
return ", ".join(str(m) for m in self)
return Matched(*(l[0][3:] if l else ("", None)))
def parse_accept(accept: Optional[str]) -> AcceptList:
def parse_accept(accept: str) -> AcceptList:
"""Parse an Accept header and order the acceptable media types in
according to RFC 7231, s. 5.3.2
accorsing to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
if not accept:
if accept == "":
return AcceptList() # Empty header, accept nothing
accept = "*/*" # No header means that all types are accepted
return AcceptList()
try:
a = [
mt
for mt in [MediaType._parse(mtype) for mtype in accept.split(",")]
if mt
]
if not a:
raise ValueError
return AcceptList(sorted(a, key=lambda x: x.key))
a = [MediaType._parse(mtype) for mtype in accept.split(",")]
return AcceptList(sorted(a, key=lambda mtype: -mtype.q))
except ValueError:
raise InvalidHeader(f"Invalid header value in Accept: {accept}")
@@ -267,23 +211,19 @@ def parse_accept(accept: Optional[str]) -> AcceptList:
def parse_content_header(value: str) -> Tuple[str, Options]:
"""Parse content-type and content-disposition header values.
E.g. `form-data; name=upload; filename="file.txt"` to
E.g. 'form-data; name=upload; filename=\"file.txt\"' to
('form-data', {'name': 'upload', 'filename': 'file.txt'})
Mostly identical to cgi.parse_header and werkzeug.parse_options_header
but runs faster and handles special characters better.
Unescapes %22 to `"` and %0D%0A to `\n` in field values.
but runs faster and handles special characters better. Unescapes quotes.
"""
value = _firefox_quote_escape.sub("%22", value)
pos = value.find(";")
if pos == -1:
options: Dict[str, Union[int, str]] = {}
else:
options = {
m.group(1)
.lower(): (m.group(2) or m.group(3))
.replace("%22", '"')
.replace("%0D%0A", "\n")
m.group(1).lower(): m.group(2) or m.group(3).replace("%22", '"')
for m in _param.finditer(value[pos:])
}
value = value[:pos]

View File

@@ -877,10 +877,7 @@ class StartupMixin(metaclass=SanicMeta):
sync_manager.shutdown()
for sock in socks:
try:
sock.shutdown(SHUT_RDWR)
except OSError:
...
sock.shutdown(SHUT_RDWR)
sock.close()
socks = []
trigger_events(main_stop, loop, primary)

View File

@@ -1,19 +1,18 @@
from abc import ABC, abstractmethod
from html5tagger import HTML, Builder, Document
from html5tagger import HTML, Document
from sanic import __version__ as VERSION
from sanic.application.logo import SVG_LOGO_SIMPLE
from sanic.application.logo import SVG_LOGO
from sanic.pages.css import CSS
class BasePage(ABC, metaclass=CSS): # no cov
TITLE = "Sanic"
HEADING = None
TITLE = "Unknown"
CSS: str
doc: Builder
def __init__(self, debug: bool = True) -> None:
self.doc = Document(self.TITLE, lang="en")
self.debug = debug
@property
@@ -21,7 +20,6 @@ class BasePage(ABC, metaclass=CSS): # no cov
return self.CSS
def render(self) -> str:
self.doc = Document(self.TITLE, lang="en", id="sanic")
self._head()
self._body()
self._foot()
@@ -30,7 +28,7 @@ class BasePage(ABC, metaclass=CSS): # no cov
def _head(self) -> None:
self.doc.style(HTML(self.style))
with self.doc.header:
self.doc.div(self.HEADING or self.TITLE)
self.doc.div(self.TITLE)
def _foot(self) -> None:
with self.doc.footer:
@@ -39,23 +37,6 @@ class BasePage(ABC, metaclass=CSS): # no cov
self._sanic_logo()
if self.debug:
self.doc.div(f"Version {VERSION}")
with self.doc.div:
for idx, (title, href) in enumerate(
(
("Docs", "https://sanic.dev"),
("Help", "https://sanic.dev/en/help.html"),
("GitHub", "https://github.com/sanic-org/sanic"),
)
):
if idx > 0:
self.doc(" | ")
self.doc.a(
title,
href=href,
target="_blank",
referrerpolicy="no-referrer",
)
self.doc.div("DEBUG mode")
@abstractmethod
def _body(self) -> None:
@@ -63,7 +44,7 @@ class BasePage(ABC, metaclass=CSS): # no cov
def _sanic_logo(self) -> None:
self.doc.a(
HTML(SVG_LOGO_SIMPLE),
HTML(SVG_LOGO),
href="https://sanic.dev",
target="_blank",
referrerpolicy="no-referrer",

View File

@@ -24,8 +24,8 @@ class CSS(ABCMeta):
def __new__(cls, name, bases, attrs):
Page = super().__new__(cls, name, bases, attrs)
# Use a locally defined STYLE or the one from styles directory
Page.STYLE = _extract_style(attrs.get("STYLE_FILE"), name)
Page.STYLE += attrs.get("STYLE_APPEND", "")
s = _extract_style(attrs.get("STYLE"), name)
Page.STYLE = f"\n/* {name} */\n{s.strip()}\n" if s else ""
# Combine with all ancestor styles
Page.CSS = "".join(
Class.STYLE

View File

@@ -1,109 +0,0 @@
from typing import Any, Mapping
import tracerite.html
from html5tagger import E
from tracerite import html_traceback, inspector
from sanic.request import Request
from .base import BasePage
# Avoid showing the request in the traceback variable inspectors
inspector.blacklist_types += (Request,)
ENDUSER_TEXT = """\
We're sorry, but it looks like something went wrong. Please try refreshing \
the page or navigating back to the homepage. If the issue persists, our \
technical team is working to resolve it as soon as possible. We apologize \
for the inconvenience and appreciate your patience.\
"""
class ErrorPage(BasePage):
STYLE_APPEND = tracerite.html.style
def __init__(
self,
debug: bool,
title: str,
text: str,
request: Request,
exc: Exception,
) -> None:
super().__init__(debug)
name = request.app.name.replace("_", " ").strip()
if name.islower():
name = name.title()
self.TITLE = f"Application {name} cannot handle your request"
self.HEADING = E("Application ").strong(name)(
" cannot handle your request"
)
self.title = title
self.text = text
self.request = request
self.exc = exc
self.details_open = not getattr(exc, "quiet", False)
def _head(self) -> None:
self.doc._script(tracerite.html.javascript)
super()._head()
def _body(self) -> None:
debug = self.request.app.debug
route_name = self.request.name or "[route not found]"
with self.doc.main:
self.doc.h1(f"⚠️ {self.title}").p(self.text)
# Show context details if available on the exception
context = getattr(self.exc, "context", None)
if context:
self._key_value_table(
"Issue context", "exception-context", context
)
if not debug:
with self.doc.div(id="enduser"):
self.doc.p(ENDUSER_TEXT).p.a("Front Page", href="/")
return
# Show additional details in debug mode,
# open by default for 500 errors
with self.doc.details(open=self.details_open, class_="smalltext"):
# Show extra details if available on the exception
extra = getattr(self.exc, "extra", None)
if extra:
self._key_value_table(
"Issue extra data", "exception-extra", extra
)
self.doc.summary(
"Details for developers (Sanic debug mode only)"
)
if self.exc:
with self.doc.div(class_="exception-wrapper"):
self.doc.h2(f"Exception in {route_name}:")
self.doc(
html_traceback(self.exc, include_js_css=False)
)
self._key_value_table(
f"{self.request.method} {self.request.path}",
"request-headers",
self.request.headers,
)
def _key_value_table(
self, title: str, table_id: str, data: Mapping[str, Any]
) -> None:
with self.doc.div(class_="key-value-display"):
self.doc.h2(title)
with self.doc.dl(id=table_id, class_="key-value-table smalltext"):
for key, value in data.items():
# Reading values may cause a new exception, so suppress it
try:
value = str(value)
except Exception:
value = E.em("Unable to display value")
self.doc.dt.span(key, class_="nobr key").span(": ").dd(
value
)

View File

@@ -1,93 +1,37 @@
/** BasePage **/
:root {
--sanic: #ff0d68;
--sanic-yellow: #FFE900;
--sanic-background: #efeced;
--sanic-text: #121010;
--sanic-text-lighter: #756169;
--sanic-link: #ff0d68;
--sanic-block-background: #f7f4f6;
--sanic-block-text: #000;
--sanic-block-alt-text: #6b6468;
--sanic-header-background: #272325;
--sanic-header-border: #fff;
--sanic-header-text: #fff;
--sanic-highlight-background: var(--sanic-yellow);
--sanic-highlight-text: var(--sanic-text);
--sanic-tab-background: #f7f4f6;
--sanic-tab-shadow: #f7f6f6;
--sanic-tab-text: #222021;
--sanic-tracerite-var: var(--sanic-text);
--sanic-tracerite-val: #ff0d68;
--sanic-tracerite-type: #6d6a6b;
}
@media (prefers-color-scheme: dark) {
:root {
--sanic-text: #f7f4f6;
--sanic-background: #121010;
--sanic-block-background: #0f0d0e;
--sanic-block-text: #f7f4f6;
--sanic-header-background: #030203;
--sanic-header-border: #000;
--sanic-highlight-text: var(--sanic-background);
--sanic-tab-background: #292728;
--sanic-tab-shadow: #0f0d0e;
--sanic-tab-text: #aea7ab;
}
}
html {
font: 16px sans-serif;
background: var(--sanic-background);
color: var(--sanic-text);
scrollbar-gutter: stable;
overflow: hidden auto;
background: #eee;
color: #111;
}
body {
margin: 0;
font-size: 1.25rem;
line-height: 125%;
}
body>* {
padding: 1rem 2vw;
}
@media (max-width: 1000px) {
@media (max-width: 1200px) {
body>* {
padding: 0.5rem 1.5vw;
}
html {
/* Scale everything by rem of 6px-16px by viewport width */
font-size: calc(6px + 10 * 100vw / 1000);
body {
font-size: 1rem;
}
}
main {
/* Make sure the footer is closer to bottom */
min-height: 70vh;
/* Generous padding for readability */
padding: 1rem 2.5rem;
}
.smalltext {
font-size: 1.0rem;
}
.container {
min-width: 600px;
max-width: 1600px;
}
header {
background: var(--sanic-header-background);
color: var(--sanic-header-text);
border-bottom: 1px solid var(--sanic-header-border);
background: #111;
color: #e1e1e1;
border-bottom: 1px solid #272727;
text-align: center;
}
@@ -96,17 +40,20 @@ footer {
display: flex;
flex-direction: column;
font-size: 0.8rem;
margin: 2rem;
line-height: 1.5em;
margin-top: 2rem;
}
h1 {
text-align: left;
}
a:visited {
color: inherit;
}
a {
text-decoration: none;
color: var(--sanic-link);
color: #88f;
}
a:hover,
@@ -115,32 +62,18 @@ a:focus {
outline: none;
}
#logo {
height: 1.75rem;
padding: 0 0.25rem;
}
span.icon {
margin-right: 1rem;
}
#logo-simple {
height: 1.75rem;
padding: 0 0.25rem;
}
@media (prefers-color-scheme: dark) {
#logo-simple path:last-child {
fill: #e1e1e1;
html {
background: #111;
color: #ccc;
}
}
#sanic pre,
#sanic code {
font-family: "Fira Code",
"Source Code Pro",
Menlo,
Meslo,
Monaco,
Consolas,
Lucida Console,
monospace;
font-size: 0.8rem;
}

View File

@@ -1,4 +1,3 @@
/** DirectoryPage **/
#breadcrumbs>a:hover {
text-decoration: none;
}

View File

@@ -1,108 +0,0 @@
/** ErrorPage **/
#enduser {
max-width: 30em;
margin: 5em auto 5em auto;
text-align: justify;
/*text-justify: both;*/
}
#enduser a {
color: var(--sanic-blue);
}
#enduser p:last-child {
text-align: right;
}
summary {
margin-top: 3em;
color: var(--sanic-text-lighter);
cursor: pointer;
}
.tracerite {
--tracerite-var: var(--sanic-tracerite-var);
--tracerite-val: var(--sanic-tracerite-val);
--tracerite-type: var(--sanic-tracerite-type);
--tracerite-exception: var(--sanic);
--tracerite-highlight: var(--sanic-yellow);
--tracerite-tab: var(--sanic-tab-background);
--tracerite-tab-text: var(--sanic-tab-text);
}
.tracerite>h3 {
margin: 0.5rem 0 !important;
}
#sanic .tracerite .traceback-labels button {
font-size: 0.8rem;
line-height: 120%;
background: var(--tracerite-tab);
color: var(--tracerite-tab-text);
transition: 0.3s;
cursor: pointer;
}
.tracerite .traceback-labels {
padding-top: 5px;
}
.tracerite .traceback-labels button:hover {
filter: contrast(150%) brightness(120%) drop-shadow(0 -0 2px var(--sanic-tab-shadow));
}
#sanic .tracerite .tracerite-tooltip::before {
bottom: 1.75em;
}
#sanic .tracerite .traceback-details mark span {
background: var(--sanic-highlight-background);
color: var(--sanic-highlight-text);
}
header {
background: var(--sanic-header-background);
}
h2 {
font-size: 1.3rem;
color: var(--sanic-text);
}
.key-value-display,
.exception-wrapper {
padding: 0.5rem;
margin-top: 1rem;
}
.key-value-display {
background-color: var(--sanic-block-background);
color: var(--sanic-block-text);
}
.key-value-display h2 {
margin-bottom: 0.2em;
}
dl.key-value-table {
width: 100%;
margin: 0;
display: grid;
grid-template-columns: 1fr 5fr;
grid-gap: .3em;
white-space: pre-wrap;
}
dl.key-value-table * {
margin: 0;
}
dl.key-value-table dt {
color: var(--sanic-block-alt-text);
word-break: break-word;
}
dl.key-value-table dd {
/* Better breaking for cookies header and such */
word-break: break-all;
}

File diff suppressed because it is too large Load Diff

View File

@@ -39,13 +39,13 @@ class Router(BaseRouter):
extra={"host": host} if host else None,
)
except RoutingNotFound as e:
raise NotFound(f"Requested URL {e.path} not found") from None
raise NotFound("Requested URL {} not found".format(e.path))
except NoMethod as e:
raise MethodNotAllowed(
f"Method {method} not allowed for URL {path}",
"Method {} not allowed for URL {}".format(method, path),
method=method,
allowed_methods=e.allowed_methods,
) from None
)
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def get( # type: ignore
@@ -61,7 +61,6 @@ class Router(BaseRouter):
correct response
:rtype: Tuple[ Route, RouteHandler, Dict[str, Any]]
"""
__tracebackhide__ = True
return self._get(path, method, host)
def add( # type: ignore

View File

@@ -1,4 +1,2 @@
[flake8]
ignore = E203, W503
per-file-ignores =
sanic/app.py:E402

View File

@@ -112,7 +112,6 @@ requirements = [
"websockets>=10.0",
"multidict>=5.0,<7.0",
"html5tagger>=1.2.1",
"tracerite>=1.0.0",
]
tests_require = [

View File

@@ -351,7 +351,6 @@ async def test_websocket_text_receive(send, receive, message_stack):
assert text == msg["text"]
@pytest.mark.asyncio
async def test_websocket_bytes_receive(send, receive, message_stack):
msg = {"bytes": b"hello", "type": "websocket.receive"}
@@ -362,7 +361,6 @@ async def test_websocket_bytes_receive(send, receive, message_stack):
assert data == msg["bytes"]
@pytest.mark.asyncio
async def test_websocket_accept_with_no_subprotocols(
send, receive, message_stack

View File

@@ -1,14 +1,12 @@
import logging
import pytest
from sanic import Sanic
from sanic.config import Config
from sanic.errorpages import TextRenderer, exception_response, guess_mime
from sanic.errorpages import HTMLRenderer, exception_response
from sanic.exceptions import NotFound, SanicException
from sanic.handlers import ErrorHandler
from sanic.request import Request
from sanic.response import HTTPResponse, empty, html, json, text
from sanic.response import HTTPResponse, html, json, text
@pytest.fixture
@@ -19,44 +17,6 @@ def app():
def err(request):
raise Exception("something went wrong")
@app.get("/forced_json/<fail>", error_format="json")
def manual_fail(request, fail):
if fail == "fail":
raise Exception
return html("") # Should be ignored
@app.get("/empty/<fail>")
def empty_fail(request, fail):
if fail == "fail":
raise Exception
return empty()
@app.get("/json/<fail>")
def json_fail(request, fail):
if fail == "fail":
raise Exception
# After 23.3 route format should become json, older versions think it
# is mixed due to empty mapping to html, and don't find any format.
return json({"foo": "bar"}) if fail == "json" else empty()
@app.get("/html/<fail>")
def html_fail(request, fail):
if fail == "fail":
raise Exception
return html("<h1>foo</h1>")
@app.get("/text/<fail>")
def text_fail(request, fail):
if fail == "fail":
raise Exception
return text("foo")
@app.get("/mixed/<param>")
def mixed_fail(request, param):
if param not in ("json", "html"):
raise Exception
return json({}) if param == "json" else html("")
return app
@@ -68,14 +28,14 @@ def fake_request(app):
@pytest.mark.parametrize(
"fallback,content_type, exception, status",
(
(None, "text/plain; charset=utf-8", Exception, 500),
(None, "text/html; charset=utf-8", Exception, 500),
("html", "text/html; charset=utf-8", Exception, 500),
("auto", "text/plain; charset=utf-8", Exception, 500),
("auto", "text/html; charset=utf-8", Exception, 500),
("text", "text/plain; charset=utf-8", Exception, 500),
("json", "application/json", Exception, 500),
(None, "text/plain; charset=utf-8", NotFound, 404),
(None, "text/html; charset=utf-8", NotFound, 404),
("html", "text/html; charset=utf-8", NotFound, 404),
("auto", "text/plain; charset=utf-8", NotFound, 404),
("auto", "text/html; charset=utf-8", NotFound, 404),
("text", "text/plain; charset=utf-8", NotFound, 404),
("json", "application/json", NotFound, 404),
),
@@ -83,10 +43,6 @@ def fake_request(app):
def test_should_return_html_valid_setting(
fake_request, fallback, content_type, exception, status
):
# Note: if fallback is None or "auto", prior to PR #2668 base was returned
# and after that a text response is given because it matches */*. Changed
# base to TextRenderer in this test, like it is in Sanic itself, so the
# test passes with either version but still covers everything that it did.
if fallback:
fake_request.app.config.FALLBACK_ERROR_FORMAT = fallback
@@ -97,7 +53,7 @@ def test_should_return_html_valid_setting(
fake_request,
e,
True,
base=TextRenderer,
base=HTMLRenderer,
fallback=fake_request.app.config.FALLBACK_ERROR_FORMAT,
)
@@ -303,16 +259,15 @@ def test_fallback_with_content_type_mismatch_accept(app):
"accept,content_type,expected",
(
(None, None, "text/plain; charset=utf-8"),
("foo/bar", None, "text/plain; charset=utf-8"),
("foo/bar", None, "text/html; charset=utf-8"),
("application/json", None, "application/json"),
("application/json,text/plain", None, "application/json"),
("text/plain,application/json", None, "application/json"),
("text/plain,foo/bar", None, "text/plain; charset=utf-8"),
("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/plain; charset=utf-8"),
# Following test is valid after v22.3
# ("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/html; charset=utf-8"),
("*/*", "application/json", "application/json"),
# App wants text/plain but accept has equal entries for it
("text/*,*/plain", None, "text/plain; charset=utf-8"),
),
)
def test_combinations_for_auto(fake_request, accept, content_type, expected):
@@ -331,7 +286,7 @@ def test_combinations_for_auto(fake_request, accept, content_type, expected):
fake_request,
e,
True,
base=TextRenderer,
base=HTMLRenderer,
fallback="auto",
)
@@ -421,109 +376,3 @@ def test_config_fallback_bad_value(app):
message = "Unknown format: fake"
with pytest.raises(SanicException, match=message):
app.config.FALLBACK_ERROR_FORMAT = "fake"
@pytest.mark.parametrize(
"route_format,fallback,accept,expected",
(
(
"json",
"html",
"*/*",
"The client accepts */*, using 'json' from fakeroute",
),
(
"json",
"auto",
"text/html,*/*;q=0.8",
"The client accepts text/html, using 'html' from any",
),
(
"json",
"json",
"text/html,*/*;q=0.8",
"The client accepts */*;q=0.8, using 'json' from fakeroute",
),
(
"",
"html",
"text/*,*/plain",
"The client accepts text/*, using 'html' from FALLBACK_ERROR_FORMAT",
),
(
"",
"json",
"text/*,*/*",
"The client accepts */*, using 'json' from FALLBACK_ERROR_FORMAT",
),
(
"",
"auto",
"*/*,application/json;q=0.5",
"The client accepts */*, using 'json' from request.accept",
),
(
"",
"auto",
"*/*",
"The client accepts */*, using 'json' from content-type",
),
(
"",
"auto",
"text/html,text/plain",
"The client accepts text/plain, using 'text' from any",
),
(
"",
"auto",
"text/html,text/plain;q=0.9",
"The client accepts text/html, using 'html' from any",
),
(
"html",
"json",
"application/xml",
"No format found, the client accepts [application/xml]",
),
("", "auto", "*/*", "The client accepts */*, using 'text' from any"),
("", "", "*/*", "No format found, the client accepts [*/*]"),
# DEPRECATED: remove in 24.3
(
"",
"auto",
"*/*",
"The client accepts */*, using 'json' from request.json",
),
),
)
def test_guess_mime_logging(
caplog, fake_request, route_format, fallback, accept, expected
):
class FakeObject:
pass
fake_request.route = FakeObject()
fake_request.route.name = "fakeroute"
fake_request.route.extra = FakeObject()
fake_request.route.extra.error_format = route_format
if accept is None:
del fake_request.headers["accept"]
else:
fake_request.headers["accept"] = accept
if "content-type" in expected:
fake_request.headers["content-type"] = "application/json"
# Fake JSON content (DEPRECATED: remove in 24.3)
if "request.json" in expected:
fake_request.parsed_json = {"foo": "bar"}
with caplog.at_level(logging.DEBUG, logger="sanic.root"):
guess_mime(fake_request, fallback)
(logmsg,) = [
r.message for r in caplog.records if r.funcName == "guess_mime"
]
assert logmsg == expected

View File

@@ -23,11 +23,11 @@ from sanic.exceptions import (
from sanic.response import text
def dl_to_dict(soup, dl_id):
def dl_to_dict(soup, css_class):
keys, values = [], []
for dl in soup.find_all("dl", {"id": dl_id}):
for dl in soup.find_all("dl", {"class": css_class}):
for dt in dl.find_all("dt"):
keys.append(dt.text.split(":", 1)[0])
keys.append(dt.text.strip())
for dd in dl.find_all("dd"):
values.append(dd.text.strip())
return dict(zip(keys, values))
@@ -194,7 +194,10 @@ def test_handled_unhandled_exception(exception_app):
assert "Internal Server Error" in soup.h1.text
message = " ".join(soup.p.text.split())
assert "The application encountered an unexpected error" in message
assert message == (
"The server encountered an internal error and "
"cannot complete your request."
)
def test_exception_in_exception_handler(exception_app):
@@ -296,7 +299,7 @@ def test_contextual_exception_context(debug):
_, response = app.test_client.post("/coffee/html", debug=debug)
soup = BeautifulSoup(response.body, "html.parser")
dl = dl_to_dict(soup, "exception-context")
dl = dl_to_dict(soup, "context")
assert response.status == 418
assert "Sorry, I cannot brew coffee" in soup.find("p").text
assert dl == {"foo": "bar"}
@@ -337,7 +340,7 @@ def test_contextual_exception_extra(debug):
_, response = app.test_client.post("/coffee/html", debug=debug)
soup = BeautifulSoup(response.body, "html.parser")
dl = dl_to_dict(soup, "exception-extra")
dl = dl_to_dict(soup, "extra")
assert response.status == 418
assert "Found bar" in soup.find("p").text
if debug:

View File

@@ -123,10 +123,10 @@ def test_html_traceback_output_in_debug_mode(exception_handler_app: Sanic):
assert "handler_4" in html
assert "foo = bar" in html
summary_text = soup.select("h3")[0].text
assert "NameError: name 'bar' is not defined" == summary_text
request_text = soup.select("h2")[-1].text
assert "GET /4" == request_text
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
"NameError: name 'bar' is not defined while handling path /4"
) == summary_text
def test_inherited_exception_handler(exception_handler_app: Sanic):
@@ -146,10 +146,11 @@ def test_chained_exception_handler(exception_handler_app: Sanic):
assert "handler_6" in html
assert "foo = 1 / arg" in html
assert "ValueError" in html
assert "GET /6" in html
summary_text = soup.select("h3")[0].text
assert "ZeroDivisionError: division by zero" == summary_text
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
"ZeroDivisionError: division by zero while handling path /6/0"
) == summary_text
def test_exception_handler_lookup(exception_handler_app: Sanic):

View File

@@ -2,16 +2,12 @@ from unittest.mock import Mock
import pytest
from sanic import Sanic, headers, json, text
from sanic import headers, text
from sanic.exceptions import InvalidHeader, PayloadTooLarge
from sanic.http import Http
from sanic.request import Request
def make_request(headers) -> Request:
return Request(b"/", headers, "1.1", "GET", None, None)
@pytest.fixture
def raised_ceiling():
Http.HEADER_CEILING = 32_768
@@ -49,17 +45,29 @@ def raised_ceiling():
("attachment", {"filename": "strange;name", "size": "123"}),
),
(
'form-data; name="foo"; value="%22\\%0D%0A"',
("form-data", {"name": "foo", "value": '"\\\n'}),
'form-data; name="files"; filename="fo\\"o;bar\\"',
("form-data", {"name": "files", "filename": 'fo"o;bar\\'})
# cgi.parse_header:
# ('form-data', {'name': 'files', 'filename': 'fo"o;bar\\'})
# werkzeug.parse_options_header:
# ('form-data', {'name': 'files', 'filename': '"fo\\"o', 'bar\\"': None})
),
# <input type=file name="foo&quot;;bar\"> with Unicode filename!
(
# Chrome, Firefox:
# Chrome:
# Content-Disposition: form-data; name="foo%22;bar\"; filename="😀"
'form-data; name="foo%22;bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
# cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'})
# werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='})
# werkzeug: ('form-data', {'name': 'foo%22;bar"; filename='})
),
(
# Firefox:
# Content-Disposition: form-data; name="foo\";bar\"; filename="😀"
'form-data; name="foo\\";bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
# cgi: ('form-data', {'name': 'foo";bar"; filename="😀'})
# werkzeug: ('form-data', {'name': 'foo";bar"; filename='})
),
],
)
@@ -177,27 +185,22 @@ def test_request_line(app):
assert request.request_line == b"GET / HTTP/1.1"
@pytest.mark.parametrize(
"raw,expected_subtype",
"raw",
(
("show/first, show/second", "first"),
("show/*, show/first", "first"),
("*/*, show/first", "first"),
("*/*, show/*", "*"),
("other/*; q=0.1, show/*; q=0.2", "*"),
("show/first; q=0.5, show/second; q=0.5", "first"),
("show/first; foo=bar, show/second; foo=bar", "first"),
("show/second, show/first; foo=bar", "first"),
("show/second; q=0.5, show/first; foo=bar; q=0.5", "first"),
("show/second; q=0.5, show/first; q=1.0", "first"),
("show/first, show/second; q=1.0", "second"),
),
"text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8",
"application/xml;q=0.9, */*;q=0.8, text/html, application/xhtml+xml",
"foo/bar;q=0.9, */*;q=0.8, text/html=0.8, text/plain, application/xhtml+xml",
)
)
def test_parse_accept_ordered_okay(raw, expected_subtype):
ordered = headers.parse_accept(raw)
assert ordered[0].type == "show"
assert ordered[0].subtype == expected_subtype
def test_accept_ordering(raw):
"""Should sort by q but also be stable."""
accept = headers.parse_accept(raw)
assert accept[0].type_ == "text"
raw1 = ", ".join(str(a) for a in accept)
accept = headers.parse_accept(raw1)
raw2 = ", ".join(str(a) for a in accept)
assert raw1 == raw2
@pytest.mark.parametrize(
@@ -206,7 +209,6 @@ def test_parse_accept_ordered_okay(raw, expected_subtype):
"missing",
"missing/",
"/missing",
"/",
),
)
def test_bad_accept(raw):
@@ -222,99 +224,75 @@ def test_empty_accept():
def test_wildcard_accept_set_ok():
accept = headers.parse_accept("*/*")[0]
assert accept.type == "*"
assert accept.subtype == "*"
assert accept.is_wildcard
assert accept.has_wildcard
accept = headers.parse_accept("foo/*")[0]
assert accept.type == "foo"
assert accept.subtype == "*"
assert not accept.is_wildcard
assert accept.has_wildcard
accept = headers.parse_accept("*/bar")[0]
assert not accept.is_wildcard
assert accept.has_wildcard
accept = headers.parse_accept("foo/bar")[0]
assert accept.type == "foo"
assert accept.subtype == "bar"
assert not accept.is_wildcard
assert not accept.has_wildcard
def test_accept_parsed_against_str():
accept = headers.Matched.parse("foo/bar")
assert accept == "foo/bar; q=0.1"
def test_media_type_matching():
assert headers.MediaType("foo", "bar").match(
headers.MediaType("foo", "bar")
)
assert headers.MediaType("foo", "bar").match("foo/bar")
@pytest.mark.parametrize(
"value,other,outcome",
"value,other,outcome,allow_type,allow_subtype",
(
# ALLOW BOTH
("foo/bar", "foo/bar", True),
("foo/bar", headers.Matched.parse("foo/bar"), True),
("foo/bar", "foo/*", True),
("foo/bar", headers.Matched.parse("foo/*"), True),
("foo/bar", "*/*", True),
("foo/bar", headers.Matched.parse("*/*"), True),
("foo/*", "foo/bar", True),
("foo/*", headers.Matched.parse("foo/bar"), True),
("foo/*", "foo/*", True),
("foo/*", headers.Matched.parse("foo/*"), True),
("foo/*", "*/*", True),
("foo/*", headers.Matched.parse("*/*"), True),
("*/*", "foo/bar", True),
("*/*", headers.Matched.parse("foo/bar"), True),
("*/*", "foo/*", True),
("*/*", headers.Matched.parse("foo/*"), True),
("*/*", "*/*", True),
("*/*", headers.Matched.parse("*/*"), True),
("foo/bar", "foo/bar", True, True, True),
("foo/bar", "foo/*", True, True, True),
("foo/bar", "*/*", True, True, True),
("foo/*", "foo/bar", True, True, True),
("foo/*", "foo/*", True, True, True),
("foo/*", "*/*", True, True, True),
("*/*", "foo/bar", True, True, True),
("*/*", "foo/*", True, True, True),
("*/*", "*/*", True, True, True),
# ALLOW TYPE
("foo/bar", "foo/bar", True, True, False),
("foo/bar", "foo/*", False, True, False),
("foo/bar", "*/*", False, True, False),
("foo/*", "foo/bar", False, True, False),
("foo/*", "foo/*", False, True, False),
("foo/*", "*/*", False, True, False),
("*/*", "foo/bar", False, True, False),
("*/*", "foo/*", False, True, False),
("*/*", "*/*", False, True, False),
# ALLOW SUBTYPE
("foo/bar", "foo/bar", True, False, True),
("foo/bar", "foo/*", True, False, True),
("foo/bar", "*/*", False, False, True),
("foo/*", "foo/bar", True, False, True),
("foo/*", "foo/*", True, False, True),
("foo/*", "*/*", False, False, True),
("*/*", "foo/bar", False, False, True),
("*/*", "foo/*", False, False, True),
("*/*", "*/*", False, False, True),
),
)
def test_accept_matching(value, other, outcome):
assert bool(headers.Matched.parse(value).match(other)) is outcome
@pytest.mark.parametrize("value", ("foo/bar", "foo/*", "*/*"))
def test_value_in_accept(value):
acceptable = headers.parse_accept(value)
assert acceptable.match("foo/bar")
assert acceptable.match("foo/*")
assert acceptable.match("*/*")
def test_accept_matching(value, other, outcome, allow_type, allow_subtype):
assert (
bool(headers.MediaType._parse(value).match(
other,
allow_type_wildcard=allow_type,
allow_subtype_wildcard=allow_subtype,
))
is outcome
)
@pytest.mark.parametrize("value", ("foo/bar", "foo/*"))
def test_value_not_in_accept(value):
acceptable = headers.parse_accept(value)
assert not acceptable.match("no/match")
assert not acceptable.match("no/*")
assert "*/*" not in acceptable
assert "*/bar" not in acceptable
@pytest.mark.parametrize(
"header,expected",
(
(
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", # noqa: E501
[
"text/html",
"application/xhtml+xml",
"image/avif",
"image/webp",
"application/xml;q=0.9",
"*/*;q=0.8",
],
),
),
)
def test_browser_headers_general(header, expected):
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert [str(item) for item in request.accept] == expected
@pytest.mark.parametrize(
"header,expected",
(
@@ -331,139 +309,12 @@ def test_browser_headers_general(header, expected):
),
),
)
def test_browser_headers_specific(header, expected):
def test_browser_headers(header, expected):
mimes = [e[0] for e in expected]
qs = [e[1] for e in expected]
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert request.accept == mimes
for a, m, q in zip(request.accept, mimes, qs):
assert a == m
assert a.mime == m
assert a.str == m
assert a.q == q
@pytest.mark.parametrize(
"raw",
(
"text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8",
"application/xml;q=0.9, */*;q=0.8, text/html, application/xhtml+xml",
(
"foo/bar;q=0.9, */*;q=0.8, text/html=0.8, "
"text/plain, application/xhtml+xml"
),
),
)
def test_accept_ordering(raw):
"""Should sort by q but also be stable."""
accept = headers.parse_accept(raw)
assert accept[0].type == "text"
raw1 = ", ".join(str(a) for a in accept)
accept = headers.parse_accept(raw1)
raw2 = ", ".join(str(a) for a in accept)
assert raw1 == raw2
def test_not_accept_wildcard():
accept = headers.parse_accept("*/*, foo/*, */bar, foo/bar;q=0.1")
assert not accept.match(
"text/html", "foo/foo", "bar/bar", accept_wildcards=False
)
# Should ignore wildcards in accept but still matches them from mimes
m = accept.match("text/plain", "*/*", accept_wildcards=False)
assert m.mime == "*/*"
assert m.match("*/*")
assert m.header == "foo/bar"
assert not accept.match(
"text/html", "foo/foo", "bar/bar", accept_wildcards=False
)
def test_accept_misc():
header = (
"foo/bar;q=0.0, */plain;param=123, text/plain, text/*, foo/bar;q=0.5"
)
a = headers.parse_accept(header)
assert repr(a) == (
"[*/plain;param=123, text/plain, text/*, "
"foo/bar;q=0.5, foo/bar;q=0.0]"
) # noqa: E501
assert str(a) == (
"*/plain;param=123, text/plain, text/*, "
"foo/bar;q=0.5, foo/bar;q=0.0"
) # noqa: E501
# q=1 types don't match foo/bar but match the two others,
# text/* comes first and matches */plain because it
# comes first in the header
m = a.match("foo/bar", "text/*", "text/plain")
assert repr(m) == "<text/* matched */plain;param=123>"
assert m == "text/*"
assert m.mime == "text/*"
assert m.header.mime == "*/plain"
assert m.header.type == "*"
assert m.header.subtype == "plain"
assert m.header.q == 1.0
assert m.header.params == dict(param="123")
# Matches object against another Matched object (by mime and header)
assert m == a.match("text/*")
# Against unsupported type falls back to object id matching
assert m != 123
# Matches the highest q value
m = a.match("foo/bar")
assert repr(m) == "<foo/bar matched foo/bar;q=0.5>"
assert m == "foo/bar"
assert m == "foo/bar;q=0.5"
# Matching nothing special case
m = a.match()
assert m == ""
assert m.header is None
# No header means anything
a = headers.parse_accept(None)
assert a == ["*/*"]
assert a.match("foo/bar")
# Empty header means nothing
a = headers.parse_accept("")
assert a == []
assert not a.match("foo/bar")
@pytest.mark.parametrize(
"headers,expected",
(
({"foo": "bar"}, "bar"),
((("foo", "bar"), ("foo", "baz")), "bar,baz"),
({}, ""),
),
)
def test_field_simple_accessor(headers, expected):
request = make_request(headers)
assert request.headers.foo == request.headers.foo_ == expected
@pytest.mark.parametrize(
"headers,expected",
(
({"foo-bar": "bar"}, "bar"),
((("foo-bar", "bar"), ("foo-bar", "baz")), "bar,baz"),
),
)
def test_field_hyphenated_accessor(headers, expected):
request = make_request(headers)
assert request.headers.foo_bar == request.headers.foo_bar_ == expected
def test_bad_accessor():
request = make_request({})
msg = "'Header' object has no attribute '_foo'"
with pytest.raises(AttributeError, match=msg):
request.headers._foo
def test_multiple_fields_accessor(app: Sanic):
@app.get("")
async def handler(request: Request):
return json({"field": request.headers.example_field})
_, response = app.test_client.get(
"/", headers=(("Example-Field", "Foo, Bar"), ("Example-Field", "Baz"))
)
assert response.json["field"] == "Foo, Bar,Baz"

View File

@@ -150,47 +150,33 @@ def test_request_accept():
async def get(request):
return response.empty()
header_value = "text/plain;format=flowed, text/plain, text/*, */*"
request, _ = app.test_client.get(
"/",
headers={"Accept": header_value},
headers={
"Accept": "text/*, text/plain, text/plain;format=flowed, */*"
},
)
assert str(request.accept) == header_value
match = request.accept.match(
"*/*;format=flowed",
assert request.accept == [
"text/plain;format=flowed",
"text/plain",
"text/*",
"*/*",
)
assert match == "*/*;format=flowed"
assert match.header.mime == "text/plain"
assert match.header.params == {"format": "flowed"}
]
header_value = (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
request, _ = app.test_client.get(
"/",
headers={"Accept": header_value},
headers={
"Accept": (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
},
)
assert [str(i) for i in request.accept] == [
assert request.accept == [
"text/html",
"text/x-c",
"text/x-dvi;q=0.8",
"text/plain;q=0.5",
"text/x-dvi; q=0.8",
"text/plain; q=0.5",
]
match = request.accept.match(
"application/json",
"text/plain", # Has lower q in accept header
"text/html;format=flowed", # Params mismatch
"text/*", # Matches
"*/*",
)
assert match == "text/*"
assert match.header.mime == "text/html"
assert match.header.q == 1.0
assert not match.header.params
def test_bad_url_parse():