Compare commits

..

8 Commits

Author SHA1 Message Date
Adam Hopkins
c695c5250a squash 2023-03-09 10:37:41 +02:00
Adam Hopkins
23c1eaab29 POC for compatibility mode 2023-03-09 09:54:32 +02:00
L. Kärkkäinen
a5d7d03413 Nicer traceback formatting (#2667)
Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
Co-authored-by: SML <smlbiobot@gmail.com>
2023-03-06 21:24:12 +02:00
L. Kärkkäinen
259e458847 Simplified parse_content_header escaping (#2707) 2023-03-06 06:39:16 +02:00
Adam Hopkins
cb49c2b26d Add header accessors (#2696) 2023-02-28 00:26:53 +02:00
L. Kärkkäinen
dfc0704831 Error page rendering format selection (#2668)
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
2023-02-26 08:25:10 +02:00
Adam Hopkins
d238995f1b Refresh Request.accept functionality (#2687) 2023-02-21 08:22:51 +02:00
Mohammad Almoghrabi
6f5303e080 check the status of socket before shutting down (#2680)
* check the status of socket before shutting down

* remove socket status checking & ignore OSError exception
2023-02-14 22:59:41 +02:00
27 changed files with 2033 additions and 1777 deletions

View File

@@ -24,5 +24,6 @@ module = [
"sanic_routing.*",
"aioquic.*",
"html5tagger.*",
"tracerite.*",
]
ignore_missing_imports = true

View File

@@ -1 +1,10 @@
__version__ = "23.3.0"
__compatibility__ = "22.12"
from inspect import currentframe, stack
for frame_info in stack():
if frame_info.frame is not currentframe():
value = frame_info.frame.f_globals.get("__SANIC_COMPATIBILITY__")
if value:
__compatibility__ = value

View File

@@ -157,7 +157,6 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"strict_slashes",
"websocket_enabled",
"websocket_tasks",
"wrappers",
)
_app_registry: Dict[str, "Sanic"] = {}

View File

@@ -105,7 +105,6 @@ class Blueprint(BaseSanic):
"version",
"version_prefix",
"websocket_routes",
"wrappers",
)
def __init__(

View File

@@ -0,0 +1,18 @@
from sanic.__version__ import __compatibility__
if __compatibility__ == "22.12":
from .v22_12.request import (
File,
Request,
RequestParameters,
parse_multipart_form,
)
elif __compatibility__ == "23.3":
from .v23_3.request import (
File,
Request,
RequestParameters,
parse_multipart_form,
)
else:
raise RuntimeError(f"Unknown compatibility value: {__compatibility__}")

View File

File diff suppressed because it is too large Load Diff

View File

View File

@@ -0,0 +1,9 @@
from ..v22_12.request import File
from ..v22_12.request import Request as LegacyRequest
from ..v22_12.request import RequestParameters, parse_multipart_form
class Request(LegacyRequest):
@property
def something_new(self):
return 123

View File

@@ -88,6 +88,12 @@ class Header(CIMultiDict):
very similar to a regular dictionary.
"""
def __getattr__(self, key: str) -> str:
if key.startswith("_"):
return self.__getattribute__(key)
key = key.rstrip("_").replace("_", "-")
return ",".join(self.getall(key, default=[]))
def get_all(self, key: str):
"""
Convenience method mapped to ``getall()``.

View File

@@ -22,6 +22,7 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES
from sanic.log import deprecation, logger
from sanic.pages.error import ErrorPage
from sanic.response import html, json, text
@@ -38,11 +39,11 @@ if t.TYPE_CHECKING:
from sanic import HTTPResponse, Request
DEFAULT_FORMAT = "auto"
FALLBACK_TEXT = (
"The server encountered an internal error and "
"cannot complete your request."
)
FALLBACK_TEXT = """\
The application encountered an unexpected error and could not continue.\
"""
FALLBACK_STATUS = 500
JSON = "application/json"
class BaseRenderer:
@@ -116,119 +117,18 @@ class HTMLRenderer(BaseRenderer):
The default fallback type.
"""
TRACEBACK_STYLE = """
html { font-family: sans-serif }
h2 { color: #888; }
.tb-wrapper p, dl, dd { margin: 0 }
.frame-border { margin: 1rem }
.frame-line > *, dt, dd { padding: 0.3rem 0.6rem }
.frame-line, dl { margin-bottom: 0.3rem }
.frame-code, dd { font-size: 16px; padding-left: 4ch }
.tb-wrapper, dl { border: 1px solid #eee }
.tb-header,.obj-header {
background: #eee; padding: 0.3rem; font-weight: bold
}
.frame-descriptor, dt { background: #e2eafb; font-size: 14px }
"""
TRACEBACK_WRAPPER_HTML = (
"<div class=tb-header>{exc_name}: {exc_value}</div>"
"<div class=tb-wrapper>{frame_html}</div>"
)
TRACEBACK_BORDER = (
"<div class=frame-border>"
"The above exception was the direct cause of the following exception:"
"</div>"
)
TRACEBACK_LINE_HTML = (
"<div class=frame-line>"
"<p class=frame-descriptor>"
"File {0.filename}, line <i>{0.lineno}</i>, "
"in <code><b>{0.name}</b></code>"
"<p class=frame-code><code>{0.line}</code>"
"</div>"
)
OBJECT_WRAPPER_HTML = (
"<div class=obj-header>{title}</div>"
"<dl class={obj_type}>{display_html}</dl>"
)
OBJECT_DISPLAY_HTML = "<dt>{key}</dt><dd><code>{value}</code></dd>"
OUTPUT_HTML = (
"<!DOCTYPE html><html lang=en>"
"<meta charset=UTF-8><title>{title}</title>\n"
"<style>{style}</style>\n"
"<h1>{title}</h1><p>{text}\n"
"{body}"
)
def _page(self, full: bool) -> HTTPResponse:
def full(self) -> HTTPResponse:
page = ErrorPage(
debug=self.debug,
title=super().title,
text=super().text,
request=self.request,
exc=self.exception,
full=full,
)
return html(page.render(), status=self.status, headers=self.headers)
def full(self) -> HTTPResponse:
return self._page(full=True)
def minimal(self) -> HTTPResponse:
return self._page(full=False)
def _generate_body(self, *, full):
lines = []
if full:
_, exc_value, __ = sys.exc_info()
exceptions = []
while exc_value:
exceptions.append(self._format_exc(exc_value))
exc_value = exc_value.__cause__
traceback_html = self.TRACEBACK_BORDER.join(reversed(exceptions))
appname = escape(self.request.app.name)
name = escape(self.exception.__class__.__name__)
value = escape(self.exception)
path = escape(self.request.path)
lines += [
f"<h2>Traceback of {appname} " "(most recent call last):</h2>",
f"{traceback_html}",
"<div class=summary><p>",
f"<b>{name}: {value}</b> "
f"while handling path <code>{path}</code>",
"</div>",
]
for attr, display in (("context", True), ("extra", bool(full))):
info = getattr(self.exception, attr, None)
if info and display:
lines.append(self._generate_object_display(info, attr))
return "\n".join(lines)
def _generate_object_display(
self, obj: t.Dict[str, t.Any], descriptor: str
) -> str:
display = "".join(
self.OBJECT_DISPLAY_HTML.format(key=key, value=value)
for key, value in obj.items()
)
return self.OBJECT_WRAPPER_HTML.format(
title=descriptor.title(),
display_html=display,
obj_type=descriptor.lower(),
)
def _format_exc(self, exc):
frames = extract_tb(exc.__traceback__)
frame_html = "".join(
self.TRACEBACK_LINE_HTML.format(frame) for frame in frames
)
return self.TRACEBACK_WRAPPER_HTML.format(
exc_name=escape(exc.__class__.__name__),
exc_value=escape(exc),
frame_html=frame_html,
)
return self.full()
class TextRenderer(BaseRenderer):
@@ -376,21 +276,18 @@ def escape(text):
return f"{text}".replace("&", "&amp;").replace("<", "&lt;")
RENDERERS_BY_CONFIG = {
"html": HTMLRenderer,
"json": JSONRenderer,
"text": TextRenderer,
MIME_BY_CONFIG = {
"text": "text/plain",
"json": "application/json",
"html": "text/html",
}
CONFIG_BY_MIME = {v: k for k, v in MIME_BY_CONFIG.items()}
RENDERERS_BY_CONTENT_TYPE = {
"text/plain": TextRenderer,
"application/json": JSONRenderer,
"multipart/form-data": HTMLRenderer,
"text/html": HTMLRenderer,
}
CONTENT_TYPE_BY_RENDERERS = {
v: k for k, v in RENDERERS_BY_CONTENT_TYPE.items()
}
# Handler source code is checked for which response types it returns with the
# route error_format="auto" (default) to determine which format to use.
@@ -406,7 +303,7 @@ RESPONSE_MAPPING = {
def check_error_format(format):
if format not in RENDERERS_BY_CONFIG and format != "auto":
if format not in MIME_BY_CONFIG and format != "auto":
raise SanicException(f"Unknown format: {format}")
@@ -421,98 +318,68 @@ def exception_response(
"""
Render a response for the default FALLBACK exception handler.
"""
content_type = None
if not renderer:
# Make sure we have something set
renderer = base
render_format = fallback
if request:
# If there is a request, try and get the format
# from the route
if request.route:
try:
if request.route.extra.error_format:
render_format = request.route.extra.error_format
except AttributeError:
...
content_type = request.headers.getone("content-type", "").split(
";"
)[0]
acceptable = request.accept
# If the format is auto still, make a guess
if render_format == "auto":
# First, if there is an Accept header, check if text/html
# is the first option
# According to MDN Web Docs, all major browsers use text/html
# as the primary value in Accept (with the exception of IE 8,
# and, well, if you are supporting IE 8, then you have bigger
# problems to concern yourself with than what default exception
# renderer is used)
# Source:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Content_negotiation/List_of_default_Accept_values
if acceptable and acceptable[0].match(
"text/html",
allow_type_wildcard=False,
allow_subtype_wildcard=False,
):
renderer = HTMLRenderer
# Second, if there is an Accept header, check if
# application/json is an option, or if the content-type
# is application/json
elif (
acceptable
and acceptable.match(
"application/json",
allow_type_wildcard=False,
allow_subtype_wildcard=False,
)
or content_type == "application/json"
):
renderer = JSONRenderer
# Third, if there is no Accept header, assume we want text.
# The likely use case here is a raw socket.
elif not acceptable:
renderer = TextRenderer
else:
# Fourth, look to see if there was a JSON body
# When in this situation, the request is probably coming
# from curl, an API client like Postman or Insomnia, or a
# package like requests or httpx
try:
# Give them the benefit of the doubt if they did:
# $ curl localhost:8000 -d '{"foo": "bar"}'
# And provide them with JSONRenderer
renderer = JSONRenderer if request.json else base
except BadRequest:
renderer = base
else:
renderer = RENDERERS_BY_CONFIG.get(render_format, renderer)
# Lastly, if there is an Accept header, make sure
# our choice is okay
if acceptable:
type_ = CONTENT_TYPE_BY_RENDERERS.get(renderer) # type: ignore
if type_ and type_ not in acceptable:
# If the renderer selected is not in the Accept header
# look through what is in the Accept header, and select
# the first option that matches. Otherwise, just drop back
# to the original default
for accept in acceptable:
mtype = f"{accept.type_}/{accept.subtype}"
maybe = RENDERERS_BY_CONTENT_TYPE.get(mtype)
if maybe:
renderer = maybe
break
else:
renderer = base
mt = guess_mime(request, fallback)
renderer = RENDERERS_BY_CONTENT_TYPE.get(mt, base)
renderer = t.cast(t.Type[BaseRenderer], renderer)
return renderer(request, exception, debug).render()
def guess_mime(req: Request, fallback: str) -> str:
# Attempt to find a suitable MIME format for the response.
# Insertion-ordered map of formats["html"] = "source of that suggestion"
formats = {}
name = ""
# Route error_format (by magic from handler code if auto, the default)
if req.route:
name = req.route.name
f = req.route.extra.error_format
if f in MIME_BY_CONFIG:
formats[f] = name
if not formats and fallback in MIME_BY_CONFIG:
formats[fallback] = "FALLBACK_ERROR_FORMAT"
# If still not known, check for the request for clues of JSON
if not formats and fallback == "auto" and req.accept.match(JSON):
if JSON in req.accept: # Literally, not wildcard
formats["json"] = "request.accept"
elif JSON in req.headers.getone("content-type", ""):
formats["json"] = "content-type"
# DEPRECATION: Remove this block in 24.3
else:
c = None
try:
c = req.json
except BadRequest:
pass
if c:
formats["json"] = "request.json"
deprecation(
"Response type was determined by the JSON content of "
"the request. This behavior is deprecated and will be "
"removed in v24.3. Please specify the format either by\n"
f' error_format="json" on route {name}, by\n'
' FALLBACK_ERROR_FORMAT = "json", or by adding header\n'
" accept: application/json to your requests.",
24.3,
)
# Any other supported formats
if fallback == "auto":
for k in MIME_BY_CONFIG:
if k not in formats:
formats[k] = "any"
mimes = [MIME_BY_CONFIG[k] for k in formats]
m = req.accept.match(*mimes)
if m:
format = CONFIG_BY_MIME[m.mime]
source = formats[format]
logger.debug(
f"The client accepts {m.header}, using '{format}' from {source}"
)
else:
logger.debug(f"No format found, the client accepts {req.accept!r}")
return m.mime

View File

@@ -19,7 +19,6 @@ OptionsIterable = Iterable[Tuple[str, str]] # May contain duplicate keys
_token, _quoted = r"([\w!#$%&'*+\-.^_`|~]+)", r'"([^"]*)"'
_param = re.compile(rf";\s*{_token}=(?:{_token}|{_quoted})", re.ASCII)
_firefox_quote_escape = re.compile(r'\\"(?!; |\s*$)')
_ipv6 = "(?:[0-9A-Fa-f]{0,4}:){2,7}[0-9A-Fa-f]{0,4}"
_ipv6_re = re.compile(_ipv6)
_host_re = re.compile(
@@ -33,143 +32,96 @@ _host_re = re.compile(
# For more information, consult ../tests/test_requests.py
def parse_arg_as_accept(f):
def func(self, other, *args, **kwargs):
if not isinstance(other, Accept) and other:
other = Accept.parse(other)
return f(self, other, *args, **kwargs)
return func
class MediaType(str):
def __new__(cls, value: str):
return str.__new__(cls, value)
def __init__(self, value: str) -> None:
self.value = value
self.is_wildcard = self.check_if_wildcard(value)
def __eq__(self, other):
if self.is_wildcard:
return True
if self.match(other):
return True
other_is_wildcard = (
other.is_wildcard
if isinstance(other, MediaType)
else self.check_if_wildcard(other)
)
return other_is_wildcard
def match(self, other):
other_value = other.value if isinstance(other, MediaType) else other
return self.value == other_value
@staticmethod
def check_if_wildcard(value):
return value == "*"
class Accept(str):
def __new__(cls, value: str, *args, **kwargs):
return str.__new__(cls, value)
class MediaType:
"""A media type, as used in the Accept header."""
def __init__(
self,
value: str,
type_: MediaType,
subtype: MediaType,
*,
q: str = "1.0",
**kwargs: str,
type_: str,
subtype: str,
**params: str,
):
qvalue = float(q)
if qvalue > 1 or qvalue < 0:
raise InvalidHeader(
f"Accept header qvalue must be between 0 and 1, not: {qvalue}"
)
self.value = value
self.type_ = type_
self.type = type_
self.subtype = subtype
self.qvalue = qvalue
self.params = kwargs
self.q = float(params.get("q", "1.0"))
self.params = params
self.mime = f"{type_}/{subtype}"
self.key = (
-1 * self.q,
-1 * len(self.params),
self.subtype == "*",
self.type == "*",
)
def _compare(self, other, method):
try:
return method(self.qvalue, other.qvalue)
except (AttributeError, TypeError):
return NotImplemented
def __repr__(self):
return self.mime + "".join(f";{k}={v}" for k, v in self.params.items())
@parse_arg_as_accept
def __lt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s < o)
def __eq__(self, other):
"""Check for mime (str or MediaType) identical type/subtype.
Parameters such as q are not considered."""
if isinstance(other, str):
# Give a friendly reminder if str contains parameters
if ";" in other:
raise ValueError("Use match() to compare with parameters")
return self.mime == other
if isinstance(other, MediaType):
# Ignore parameters silently with MediaType objects
return self.mime == other.mime
return NotImplemented
@parse_arg_as_accept
def __le__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s <= o)
@parse_arg_as_accept
def __eq__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s == o)
@parse_arg_as_accept
def __ge__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s >= o)
@parse_arg_as_accept
def __gt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s > o)
@parse_arg_as_accept
def __ne__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s != o)
@parse_arg_as_accept
def match(
self,
other,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
type_match = (
self.type_ == other.type_
if allow_type_wildcard
else (
self.type_.match(other.type_)
and not self.type_.is_wildcard
and not other.type_.is_wildcard
)
mime_with_params: Union[str, MediaType],
) -> Optional[MediaType]:
"""Check if this media type matches the given mime type/subtype.
Wildcards are supported both ways on both type and subtype.
If mime contains a semicolon, optionally followed by parameters,
the parameters of the two media types must match exactly.
Note: Use the `==` operator instead to check for literal matches
without expanding wildcards.
@param media_type: A type/subtype string to match.
@return `self` if the media types are compatible, else `None`
"""
mt = (
MediaType._parse(mime_with_params)
if isinstance(mime_with_params, str)
else mime_with_params
)
subtype_match = (
self.subtype == other.subtype
if allow_subtype_wildcard
else (
self.subtype.match(other.subtype)
and not self.subtype.is_wildcard
and not other.subtype.is_wildcard
return (
self
if (
mt
# All parameters given in the other media type must match
and all(self.params.get(k) == v for k, v in mt.params.items())
# Subtype match
and (
self.subtype == mt.subtype
or self.subtype == "*"
or mt.subtype == "*"
)
# Type match
and (
self.type == mt.type or self.type == "*" or mt.type == "*"
)
)
else None
)
return type_match and subtype_match
@property
def has_wildcard(self) -> bool:
"""Return True if this media type has a wildcard in it."""
return any(part == "*" for part in (self.subtype, self.type))
@classmethod
def parse(cls, raw: str) -> Accept:
invalid = False
mtype = raw.strip()
def _parse(cls, mime_with_params: str) -> Optional[MediaType]:
mtype = mime_with_params.strip()
if "/" not in mime_with_params:
return None
try:
media, *raw_params = mtype.split(";")
type_, subtype = media.split("/")
except ValueError:
invalid = True
if invalid or not type_ or not subtype:
raise InvalidHeader(f"Header contains invalid Accept value: {raw}")
mime, *raw_params = mtype.split(";")
type_, subtype = mime.split("/", 1)
if not type_ or not subtype:
raise ValueError(f"Invalid media type: {mtype}")
params = dict(
[
@@ -178,46 +130,160 @@ class Accept(str):
]
)
return cls(mtype, MediaType(type_), MediaType(subtype), **params)
return cls(type_.lstrip(), subtype.rstrip(), **params)
class AcceptContainer(list):
def __contains__(self, o: object) -> bool:
return any(item.match(o) for item in self)
class Matched:
"""A matching result of a MIME string against a header."""
def match(
self,
o: object,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
return any(
item.match(
o,
allow_type_wildcard=allow_type_wildcard,
allow_subtype_wildcard=allow_subtype_wildcard,
def __init__(self, mime: str, header: Optional[MediaType]):
self.mime = mime
self.header = header
def __repr__(self):
return f"<{self} matched {self.header}>" if self else "<no match>"
def __str__(self):
return self.mime
def __bool__(self):
return self.header is not None
def __eq__(self, other: Any) -> bool:
try:
comp, other_accept = self._compare(other)
except TypeError:
return False
return bool(
comp
and (
(self.header and other_accept.header)
or (not self.header and not other_accept.header)
)
for item in self
)
def _compare(self, other) -> Tuple[bool, Matched]:
if isinstance(other, str):
parsed = Matched.parse(other)
if self.mime == other:
return True, parsed
other = parsed
if isinstance(other, Matched):
return self.header == other.header, other
raise TypeError(
"Comparison not supported between unequal "
f"mime types of '{self.mime}' and '{other}'"
)
def match(self, other: Union[str, Matched]) -> Optional[Matched]:
accept = Matched.parse(other) if isinstance(other, str) else other
if not self.header or not accept.header:
return None
if self.header.match(accept.header):
return accept
return None
@classmethod
def parse(cls, raw: str) -> Matched:
media_type = MediaType._parse(raw)
return cls(raw, media_type)
class AcceptList(list):
"""A list of media types, as used in the Accept header.
The Accept header entries are listed in order of preference, starting
with the most preferred. This class is a list of `MediaType` objects,
that encapsulate also the q value or any other parameters.
Two separate methods are provided for searching the list:
- 'match' for finding the most preferred match (wildcards supported)
- operator 'in' for checking explicit matches (wildcards as literals)
"""
def match(self, *mimes: str, accept_wildcards=True) -> Matched:
"""Find a media type accepted by the client.
This method can be used to find which of the media types requested by
the client is most preferred against the ones given as arguments.
The ordering of preference is set by:
1. The order set by RFC 7231, s. 5.3.2, giving a higher priority
to q values and more specific type definitions,
2. The order of the arguments (first is most preferred), and
3. The first matching entry on the Accept header.
Wildcards are matched both ways. A match is usually found, as the
Accept headers typically include `*/*`, in particular if the header
is missing, is not manually set, or if the client is a browser.
Note: the returned object behaves as a string of the mime argument
that matched, and is empty/falsy if no match was found. The matched
header entry `MediaType` or `None` is available as the `m` attribute.
@param mimes: Any MIME types to search for in order of preference.
@param accept_wildcards: Match Accept entries with wildcards in them.
@return A match object with the mime string and the MediaType object.
"""
a = sorted(
(-acc.q, i, j, mime, acc)
for j, acc in enumerate(self)
if accept_wildcards or not acc.has_wildcard
for i, mime in enumerate(mimes)
if acc.match(mime)
)
return Matched(*(a[0][-2:] if a else ("", None)))
def __str__(self):
"""Format as Accept header value (parsed, not original)."""
return ", ".join(str(m) for m in self)
def parse_accept(accept: Optional[str]) -> AcceptList:
"""Parse an Accept header and order the acceptable media types in
according to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
if not accept:
if accept == "":
return AcceptList() # Empty header, accept nothing
accept = "*/*" # No header means that all types are accepted
try:
a = [
mt
for mt in [MediaType._parse(mtype) for mtype in accept.split(",")]
if mt
]
if not a:
raise ValueError
return AcceptList(sorted(a, key=lambda x: x.key))
except ValueError:
raise InvalidHeader(f"Invalid header value in Accept: {accept}")
def parse_content_header(value: str) -> Tuple[str, Options]:
"""Parse content-type and content-disposition header values.
E.g. 'form-data; name=upload; filename=\"file.txt\"' to
E.g. `form-data; name=upload; filename="file.txt"` to
('form-data', {'name': 'upload', 'filename': 'file.txt'})
Mostly identical to cgi.parse_header and werkzeug.parse_options_header
but runs faster and handles special characters better. Unescapes quotes.
but runs faster and handles special characters better.
Unescapes %22 to `"` and %0D%0A to `\n` in field values.
"""
value = _firefox_quote_escape.sub("%22", value)
pos = value.find(";")
if pos == -1:
options: Dict[str, Union[int, str]] = {}
else:
options = {
m.group(1).lower(): m.group(2) or m.group(3).replace("%22", '"')
m.group(1)
.lower(): (m.group(2) or m.group(3))
.replace("%22", '"')
.replace("%0D%0A", "\n")
for m in _param.finditer(value[pos:])
}
value = value[:pos]
@@ -368,34 +434,6 @@ def format_http1_response(status: int, headers: HeaderBytesIterable) -> bytes:
return ret
def _sort_accept_value(accept: Accept):
return (
accept.qvalue,
len(accept.params),
accept.subtype != "*",
accept.type_ != "*",
)
def parse_accept(accept: str) -> AcceptContainer:
"""Parse an Accept header and order the acceptable media types in
accorsing to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
media_types = accept.split(",")
accept_list: List[Accept] = []
for mtype in media_types:
if not mtype:
continue
accept_list.append(Accept.parse(mtype))
return AcceptContainer(
sorted(accept_list, key=_sort_accept_value, reverse=True)
)
def parse_credentials(
header: Optional[str],
prefixes: Union[List, Tuple, Set] = None,

View File

@@ -14,7 +14,6 @@ class MiddlewareMixin(metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_middleware: List[FutureMiddleware] = []
self.wrappers = []
def _apply_middleware(self, middleware: FutureMiddleware):
raise NotImplementedError # noqa
@@ -141,7 +140,3 @@ class MiddlewareMixin(metaclass=SanicMeta):
reverse=True,
)[::-1]
)
def wrap(self, handler):
self.wrappers.append(handler)
return handler

View File

@@ -267,11 +267,11 @@ class StartupMixin(metaclass=SanicMeta):
if single_process and legacy:
raise RuntimeError("Cannot run single process and legacy mode")
# if register_sys_signals is False and not (single_process or legacy):
# raise RuntimeError(
# "Cannot run Sanic.serve with register_sys_signals=False. "
# "Use either Sanic.serve_single or Sanic.serve_legacy."
# )
if register_sys_signals is False and not (single_process or legacy):
raise RuntimeError(
"Cannot run Sanic.serve with register_sys_signals=False. "
"Use either Sanic.serve_single or Sanic.serve_legacy."
)
if motd_display:
self.config.MOTD_DISPLAY.update(motd_display)
@@ -877,7 +877,10 @@ class StartupMixin(metaclass=SanicMeta):
sync_manager.shutdown()
for sock in socks:
sock.shutdown(SHUT_RDWR)
try:
sock.shutdown(SHUT_RDWR)
except OSError:
...
sock.close()
socks = []
trigger_events(main_stop, loop, primary)

View File

@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from html5tagger import HTML, Document
from html5tagger import HTML, Builder, Document
from sanic import __version__ as VERSION
from sanic.application.logo import SVG_LOGO_SIMPLE
@@ -9,10 +9,11 @@ from sanic.pages.css import CSS
class BasePage(ABC, metaclass=CSS): # no cov
TITLE = "Sanic"
HEADING = None
CSS: str
doc: Builder
def __init__(self, debug: bool = True) -> None:
self.doc = None
self.debug = debug
@property
@@ -20,7 +21,7 @@ class BasePage(ABC, metaclass=CSS): # no cov
return self.CSS
def render(self) -> str:
self.doc = Document(self.TITLE, lang="en")
self.doc = Document(self.TITLE, lang="en", id="sanic")
self._head()
self._body()
self._foot()
@@ -29,7 +30,7 @@ class BasePage(ABC, metaclass=CSS): # no cov
def _head(self) -> None:
self.doc.style(HTML(self.style))
with self.doc.header:
self.doc.div(self.TITLE)
self.doc.div(self.HEADING or self.TITLE)
def _foot(self) -> None:
with self.doc.footer:
@@ -38,6 +39,23 @@ class BasePage(ABC, metaclass=CSS): # no cov
self._sanic_logo()
if self.debug:
self.doc.div(f"Version {VERSION}")
with self.doc.div:
for idx, (title, href) in enumerate(
(
("Docs", "https://sanic.dev"),
("Help", "https://sanic.dev/en/help.html"),
("GitHub", "https://github.com/sanic-org/sanic"),
)
):
if idx > 0:
self.doc(" | ")
self.doc.a(
title,
href=href,
target="_blank",
referrerpolicy="no-referrer",
)
self.doc.div("DEBUG mode")
@abstractmethod
def _body(self) -> None:

View File

@@ -13,7 +13,12 @@ from .base import BasePage
# Avoid showing the request in the traceback variable inspectors
inspector.blacklist_types += (Request,)
ENDUSER_TEXT = """We're sorry, but it looks like something went wrong. Please try refreshing the page or navigating back to the homepage. If the issue persists, our technical team is working to resolve it as soon as possible. We apologize for the inconvenience and appreciate your patience.""" # noqa: E501
ENDUSER_TEXT = """\
We're sorry, but it looks like something went wrong. Please try refreshing \
the page or navigating back to the homepage. If the issue persists, our \
technical team is working to resolve it as soon as possible. We apologize \
for the inconvenience and appreciate your patience.\
"""
class ErrorPage(BasePage):
@@ -21,29 +26,25 @@ class ErrorPage(BasePage):
def __init__(
self,
debug: bool,
title: str,
text: str,
request: Request,
exc: Exception,
full: bool,
) -> None:
super().__init__()
# Internal server errors come with the text of the exception,
# which we don't want to show to the user.
# FIXME: Needs to be done in a better way, elsewhere
if "Internal Server Error" in title:
text = "The application encountered an unexpected error and could not continue." # noqa: E501
super().__init__(debug)
name = request.app.name.replace("_", " ").strip()
if name.islower():
name = name.title()
self.TITLE = E("Application ").strong(name)(
self.TITLE = f"Application {name} cannot handle your request"
self.HEADING = E("Application ").strong(name)(
" cannot handle your request"
)
self.title = title
self.text = text
self.request = request
self.exc = exc
self.full = full
self.details_open = not getattr(exc, "quiet", False)
def _head(self) -> None:
self.doc._script(tracerite.html.javascript)
@@ -51,10 +52,7 @@ class ErrorPage(BasePage):
def _body(self) -> None:
debug = self.request.app.debug
try:
route_name = self.request.route.name
except AttributeError:
route_name = "[route not found]"
route_name = self.request.name or "[route not found]"
with self.doc.main:
self.doc.h1(f"⚠️ {self.title}").p(self.text)
# Show context details if available on the exception
@@ -70,7 +68,7 @@ class ErrorPage(BasePage):
return
# Show additional details in debug mode,
# open by default for 500 errors
with self.doc.details(open=self.full, class_="smalltext"):
with self.doc.details(open=self.details_open, class_="smalltext"):
# Show extra details if available on the exception
extra = getattr(self.exc, "extra", None)
if extra:
@@ -82,8 +80,11 @@ class ErrorPage(BasePage):
"Details for developers (Sanic debug mode only)"
)
if self.exc:
self.doc.h2(f"Exception in {route_name}:")
self.doc(html_traceback(self.exc, include_js_css=False))
with self.doc.div(class_="exception-wrapper"):
self.doc.h2(f"Exception in {route_name}:")
self.doc(
html_traceback(self.exc, include_js_css=False)
)
self._key_value_table(
f"{self.request.method} {self.request.path}",
@@ -94,12 +95,15 @@ class ErrorPage(BasePage):
def _key_value_table(
self, title: str, table_id: str, data: Mapping[str, Any]
) -> None:
self.doc.h2(title)
with self.doc.dl(id=table_id, class_="key-value-table smalltext"):
for key, value in data.items():
# Reading values may cause a new exception, so suppress it
try:
value = str(value)
except Exception:
value = E.em("Unable to display value")
self.doc.dt.span(key, class_="nobr key").span(": ").dd(value)
with self.doc.div(class_="key-value-display"):
self.doc.h2(title)
with self.doc.dl(id=table_id, class_="key-value-table smalltext"):
for key, value in data.items():
# Reading values may cause a new exception, so suppress it
try:
value = str(value)
except Exception:
value = E.em("Unable to display value")
self.doc.dt.span(key, class_="nobr key").span(": ").dd(
value
)

View File

@@ -2,32 +2,40 @@
:root {
--sanic: #ff0d68;
--sanic-blue: #0092FF;
--sanic-yellow: #FFE900;
--sanic-purple: #833FE3;
--sanic-green: #37ae6f;
--sanic-background: #f1f5f9;
--sanic-text: #1f2937;
--sanic-tab-background: #fff;
--sanic-tab-text: #0f172a;
--sanic-tab-shadow: #adadad;
--sanic-background: #efeced;
--sanic-text: #121010;
--sanic-text-lighter: #756169;
--sanic-link: #ff0d68;
--sanic-block-background: #f7f4f6;
--sanic-block-text: #000;
--sanic-block-alt-text: #6b6468;
--sanic-header-background: #272325;
--sanic-header-border: #fff;
--sanic-header-text: #fff;
--sanic-highlight-background: var(--sanic-yellow);
--sanic-highlight-text: var(--sanic-text);
--sanic-header-background: #000;
--sanic-tab-background: #f7f4f6;
--sanic-tab-shadow: #f7f6f6;
--sanic-tab-text: #222021;
--sanic-tracerite-var: var(--sanic-text);
--sanic-tracerite-val: #ff0d68;
--sanic-tracerite-type: #6d6a6b;
}
@media (prefers-color-scheme: dark) {
:root {
--sanic-purple: #D246DE;
--sanic-green: #16DB93;
--sanic-background: #111;
--sanic-text: #e7e7e7;
--sanic-tab-background: #484848;
--sanic-tab-text: #e1e1e1;
--sanic-tab-shadow: #000;
--sanic-highlight-background: var(--sanic-yellow);
--sanic-highlight-text: #000;
--sanic-header-background: #000;
--sanic-text: #f7f4f6;
--sanic-background: #121010;
--sanic-block-background: #0f0d0e;
--sanic-block-text: #f7f4f6;
--sanic-header-background: #030203;
--sanic-header-border: #000;
--sanic-highlight-text: var(--sanic-background);
--sanic-tab-background: #292728;
--sanic-tab-shadow: #0f0d0e;
--sanic-tab-text: #aea7ab;
}
}
@@ -61,10 +69,10 @@ body>* {
}
main {
min-height: 70vh;
/* Make sure the footer is closer to bottom */
padding: 1rem 2.5rem;
min-height: 70vh;
/* Generous padding for readability */
padding: 1rem 2.5rem;
}
.smalltext {
@@ -77,9 +85,9 @@ main {
}
header {
background: #111;
color: #e1e1e1;
border-bottom: 1px solid #272727;
background: var(--sanic-header-background);
color: var(--sanic-header-text);
border-bottom: 1px solid var(--sanic-header-border);
text-align: center;
}
@@ -88,20 +96,17 @@ footer {
display: flex;
flex-direction: column;
font-size: 0.8rem;
margin-top: 2rem;
margin: 2rem;
line-height: 1.5em;
}
h1 {
text-align: left;
}
a:visited {
color: inherit;
}
a {
text-decoration: none;
color: #88f;
color: var(--sanic-link);
}
a:hover,
@@ -126,3 +131,16 @@ span.icon {
fill: #e1e1e1;
}
}
#sanic pre,
#sanic code {
font-family: "Fira Code",
"Source Code Pro",
Menlo,
Meslo,
Monaco,
Consolas,
Lucida Console,
monospace;
font-size: 0.8rem;
}

View File

@@ -16,14 +16,14 @@
summary {
margin-top: 3em;
color: var(--sanic-blue);
color: var(--sanic-text-lighter);
cursor: pointer;
}
.tracerite {
--tracerite-var: var(--sanic-blue);
--tracerite-val: var(--sanic-text);
--tracerite-type: var(--sanic-green);
--tracerite-var: var(--sanic-tracerite-var);
--tracerite-val: var(--sanic-tracerite-val);
--tracerite-type: var(--sanic-tracerite-type);
--tracerite-exception: var(--sanic);
--tracerite-highlight: var(--sanic-yellow);
--tracerite-tab: var(--sanic-tab-background);
@@ -34,11 +34,11 @@ summary {
margin: 0.5rem 0 !important;
}
.tracerite .traceback-labels button {
font-size: 0.8rem !important;
line-height: 120% !important;
background: var(--tracerite-tab) !important;
color: var(--tracerite-tab-text) !important;
#sanic .tracerite .traceback-labels button {
font-size: 0.8rem;
line-height: 120%;
background: var(--tracerite-tab);
color: var(--tracerite-tab-text);
transition: 0.3s;
cursor: pointer;
}
@@ -51,23 +51,37 @@ summary {
filter: contrast(150%) brightness(120%) drop-shadow(0 -0 2px var(--sanic-tab-shadow));
}
.tracerite .traceback-details mark span {
background: var(--sanic-highlight-background) !important;
color: var(--sanic-highlight-text) !important;
#sanic .tracerite .tracerite-tooltip::before {
bottom: 1.75em;
}
#sanic .tracerite .traceback-details mark span {
background: var(--sanic-highlight-background);
color: var(--sanic-highlight-text);
}
header {
background: var(--sanic-header-background);
}
h1 {
/*margin-left: -1.5rem; !* Emoji partially in the left margin *!*/
h2 {
font-size: 1.3rem;
color: var(--sanic-text);
}
h2 {
margin: 1em 0 0.2em 0;
font-size: 1.25rem;
color: var(--sanic-text);
.key-value-display,
.exception-wrapper {
padding: 0.5rem;
margin-top: 1rem;
}
.key-value-display {
background-color: var(--sanic-block-background);
color: var(--sanic-block-text);
}
.key-value-display h2 {
margin-bottom: 0.2em;
}
dl.key-value-table {
@@ -84,22 +98,11 @@ dl.key-value-table * {
}
dl.key-value-table dt {
color: #888;
color: var(--sanic-block-alt-text);
word-break: break-word;
}
dl.key-value-table dd {
word-break: break-all;
/* Better breaking for cookies header and such */
}
.tracerite .codeline {
font-family:
"Fira Code",
"Source Code Pro",
Menlo,
Monaco,
Consolas,
Lucida Console,
monospace;
word-break: break-all;
}

File diff suppressed because it is too large Load Diff

View File

@@ -130,14 +130,13 @@ def _setup_system_signals(
register_sys_signals: bool,
loop: asyncio.AbstractEventLoop,
) -> None: # no cov
print(">>>>>>>>>>>>>>>>>.", run_multiple)
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"
# Register signals for graceful termination
if register_sys_signals and False:
if register_sys_signals:
if OS_IS_WINDOWS:
ctrlc_workaround_for_windows(app)
else:

View File

@@ -1,2 +1,4 @@
[flake8]
ignore = E203, W503
per-file-ignores =
sanic/app.py:E402

View File

@@ -351,6 +351,7 @@ async def test_websocket_text_receive(send, receive, message_stack):
assert text == msg["text"]
@pytest.mark.asyncio
async def test_websocket_bytes_receive(send, receive, message_stack):
msg = {"bytes": b"hello", "type": "websocket.receive"}
@@ -361,6 +362,7 @@ async def test_websocket_bytes_receive(send, receive, message_stack):
assert data == msg["bytes"]
@pytest.mark.asyncio
async def test_websocket_accept_with_no_subprotocols(
send, receive, message_stack

View File

@@ -1,12 +1,14 @@
import logging
import pytest
from sanic import Sanic
from sanic.config import Config
from sanic.errorpages import HTMLRenderer, exception_response
from sanic.errorpages import TextRenderer, exception_response, guess_mime
from sanic.exceptions import NotFound, SanicException
from sanic.handlers import ErrorHandler
from sanic.request import Request
from sanic.response import HTTPResponse, html, json, text
from sanic.response import HTTPResponse, empty, html, json, text
@pytest.fixture
@@ -17,6 +19,44 @@ def app():
def err(request):
raise Exception("something went wrong")
@app.get("/forced_json/<fail>", error_format="json")
def manual_fail(request, fail):
if fail == "fail":
raise Exception
return html("") # Should be ignored
@app.get("/empty/<fail>")
def empty_fail(request, fail):
if fail == "fail":
raise Exception
return empty()
@app.get("/json/<fail>")
def json_fail(request, fail):
if fail == "fail":
raise Exception
# After 23.3 route format should become json, older versions think it
# is mixed due to empty mapping to html, and don't find any format.
return json({"foo": "bar"}) if fail == "json" else empty()
@app.get("/html/<fail>")
def html_fail(request, fail):
if fail == "fail":
raise Exception
return html("<h1>foo</h1>")
@app.get("/text/<fail>")
def text_fail(request, fail):
if fail == "fail":
raise Exception
return text("foo")
@app.get("/mixed/<param>")
def mixed_fail(request, param):
if param not in ("json", "html"):
raise Exception
return json({}) if param == "json" else html("")
return app
@@ -28,14 +68,14 @@ def fake_request(app):
@pytest.mark.parametrize(
"fallback,content_type, exception, status",
(
(None, "text/html; charset=utf-8", Exception, 500),
(None, "text/plain; charset=utf-8", Exception, 500),
("html", "text/html; charset=utf-8", Exception, 500),
("auto", "text/html; charset=utf-8", Exception, 500),
("auto", "text/plain; charset=utf-8", Exception, 500),
("text", "text/plain; charset=utf-8", Exception, 500),
("json", "application/json", Exception, 500),
(None, "text/html; charset=utf-8", NotFound, 404),
(None, "text/plain; charset=utf-8", NotFound, 404),
("html", "text/html; charset=utf-8", NotFound, 404),
("auto", "text/html; charset=utf-8", NotFound, 404),
("auto", "text/plain; charset=utf-8", NotFound, 404),
("text", "text/plain; charset=utf-8", NotFound, 404),
("json", "application/json", NotFound, 404),
),
@@ -43,6 +83,10 @@ def fake_request(app):
def test_should_return_html_valid_setting(
fake_request, fallback, content_type, exception, status
):
# Note: if fallback is None or "auto", prior to PR #2668 base was returned
# and after that a text response is given because it matches */*. Changed
# base to TextRenderer in this test, like it is in Sanic itself, so the
# test passes with either version but still covers everything that it did.
if fallback:
fake_request.app.config.FALLBACK_ERROR_FORMAT = fallback
@@ -53,7 +97,7 @@ def test_should_return_html_valid_setting(
fake_request,
e,
True,
base=HTMLRenderer,
base=TextRenderer,
fallback=fake_request.app.config.FALLBACK_ERROR_FORMAT,
)
@@ -259,15 +303,16 @@ def test_fallback_with_content_type_mismatch_accept(app):
"accept,content_type,expected",
(
(None, None, "text/plain; charset=utf-8"),
("foo/bar", None, "text/html; charset=utf-8"),
("foo/bar", None, "text/plain; charset=utf-8"),
("application/json", None, "application/json"),
("application/json,text/plain", None, "application/json"),
("text/plain,application/json", None, "application/json"),
("text/plain,foo/bar", None, "text/plain; charset=utf-8"),
# Following test is valid after v22.3
# ("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/html; charset=utf-8"),
("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/plain; charset=utf-8"),
("*/*", "application/json", "application/json"),
# App wants text/plain but accept has equal entries for it
("text/*,*/plain", None, "text/plain; charset=utf-8"),
),
)
def test_combinations_for_auto(fake_request, accept, content_type, expected):
@@ -286,7 +331,7 @@ def test_combinations_for_auto(fake_request, accept, content_type, expected):
fake_request,
e,
True,
base=HTMLRenderer,
base=TextRenderer,
fallback="auto",
)
@@ -376,3 +421,109 @@ def test_config_fallback_bad_value(app):
message = "Unknown format: fake"
with pytest.raises(SanicException, match=message):
app.config.FALLBACK_ERROR_FORMAT = "fake"
@pytest.mark.parametrize(
"route_format,fallback,accept,expected",
(
(
"json",
"html",
"*/*",
"The client accepts */*, using 'json' from fakeroute",
),
(
"json",
"auto",
"text/html,*/*;q=0.8",
"The client accepts text/html, using 'html' from any",
),
(
"json",
"json",
"text/html,*/*;q=0.8",
"The client accepts */*;q=0.8, using 'json' from fakeroute",
),
(
"",
"html",
"text/*,*/plain",
"The client accepts text/*, using 'html' from FALLBACK_ERROR_FORMAT",
),
(
"",
"json",
"text/*,*/*",
"The client accepts */*, using 'json' from FALLBACK_ERROR_FORMAT",
),
(
"",
"auto",
"*/*,application/json;q=0.5",
"The client accepts */*, using 'json' from request.accept",
),
(
"",
"auto",
"*/*",
"The client accepts */*, using 'json' from content-type",
),
(
"",
"auto",
"text/html,text/plain",
"The client accepts text/plain, using 'text' from any",
),
(
"",
"auto",
"text/html,text/plain;q=0.9",
"The client accepts text/html, using 'html' from any",
),
(
"html",
"json",
"application/xml",
"No format found, the client accepts [application/xml]",
),
("", "auto", "*/*", "The client accepts */*, using 'text' from any"),
("", "", "*/*", "No format found, the client accepts [*/*]"),
# DEPRECATED: remove in 24.3
(
"",
"auto",
"*/*",
"The client accepts */*, using 'json' from request.json",
),
),
)
def test_guess_mime_logging(
caplog, fake_request, route_format, fallback, accept, expected
):
class FakeObject:
pass
fake_request.route = FakeObject()
fake_request.route.name = "fakeroute"
fake_request.route.extra = FakeObject()
fake_request.route.extra.error_format = route_format
if accept is None:
del fake_request.headers["accept"]
else:
fake_request.headers["accept"] = accept
if "content-type" in expected:
fake_request.headers["content-type"] = "application/json"
# Fake JSON content (DEPRECATED: remove in 24.3)
if "request.json" in expected:
fake_request.parsed_json = {"foo": "bar"}
with caplog.at_level(logging.DEBUG, logger="sanic.root"):
guess_mime(fake_request, fallback)
(logmsg,) = [
r.message for r in caplog.records if r.funcName == "guess_mime"
]
assert logmsg == expected

View File

@@ -23,11 +23,11 @@ from sanic.exceptions import (
from sanic.response import text
def dl_to_dict(soup, css_class):
def dl_to_dict(soup, dl_id):
keys, values = [], []
for dl in soup.find_all("dl", {"class": css_class}):
for dl in soup.find_all("dl", {"id": dl_id}):
for dt in dl.find_all("dt"):
keys.append(dt.text.strip())
keys.append(dt.text.split(":", 1)[0])
for dd in dl.find_all("dd"):
values.append(dd.text.strip())
return dict(zip(keys, values))
@@ -194,10 +194,7 @@ def test_handled_unhandled_exception(exception_app):
assert "Internal Server Error" in soup.h1.text
message = " ".join(soup.p.text.split())
assert message == (
"The server encountered an internal error and "
"cannot complete your request."
)
assert "The application encountered an unexpected error" in message
def test_exception_in_exception_handler(exception_app):
@@ -299,7 +296,7 @@ def test_contextual_exception_context(debug):
_, response = app.test_client.post("/coffee/html", debug=debug)
soup = BeautifulSoup(response.body, "html.parser")
dl = dl_to_dict(soup, "context")
dl = dl_to_dict(soup, "exception-context")
assert response.status == 418
assert "Sorry, I cannot brew coffee" in soup.find("p").text
assert dl == {"foo": "bar"}
@@ -340,7 +337,7 @@ def test_contextual_exception_extra(debug):
_, response = app.test_client.post("/coffee/html", debug=debug)
soup = BeautifulSoup(response.body, "html.parser")
dl = dl_to_dict(soup, "extra")
dl = dl_to_dict(soup, "exception-extra")
assert response.status == 418
assert "Found bar" in soup.find("p").text
if debug:

View File

@@ -123,10 +123,10 @@ def test_html_traceback_output_in_debug_mode(exception_handler_app: Sanic):
assert "handler_4" in html
assert "foo = bar" in html
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
"NameError: name 'bar' is not defined while handling path /4"
) == summary_text
summary_text = soup.select("h3")[0].text
assert "NameError: name 'bar' is not defined" == summary_text
request_text = soup.select("h2")[-1].text
assert "GET /4" == request_text
def test_inherited_exception_handler(exception_handler_app: Sanic):
@@ -146,11 +146,10 @@ def test_chained_exception_handler(exception_handler_app: Sanic):
assert "handler_6" in html
assert "foo = 1 / arg" in html
assert "ValueError" in html
assert "GET /6" in html
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
"ZeroDivisionError: division by zero while handling path /6/0"
) == summary_text
summary_text = soup.select("h3")[0].text
assert "ZeroDivisionError: division by zero" == summary_text
def test_exception_handler_lookup(exception_handler_app: Sanic):

View File

@@ -2,12 +2,16 @@ from unittest.mock import Mock
import pytest
from sanic import headers, text
from sanic import Sanic, headers, json, text
from sanic.exceptions import InvalidHeader, PayloadTooLarge
from sanic.http import Http
from sanic.request import Request
def make_request(headers) -> Request:
return Request(b"/", headers, "1.1", "GET", None, None)
@pytest.fixture
def raised_ceiling():
Http.HEADER_CEILING = 32_768
@@ -45,29 +49,17 @@ def raised_ceiling():
("attachment", {"filename": "strange;name", "size": "123"}),
),
(
'form-data; name="files"; filename="fo\\"o;bar\\"',
("form-data", {"name": "files", "filename": 'fo"o;bar\\'})
# cgi.parse_header:
# ('form-data', {'name': 'files', 'filename': 'fo"o;bar\\'})
# werkzeug.parse_options_header:
# ('form-data', {'name': 'files', 'filename': '"fo\\"o', 'bar\\"': None})
'form-data; name="foo"; value="%22\\%0D%0A"',
("form-data", {"name": "foo", "value": '"\\\n'}),
),
# <input type=file name="foo&quot;;bar\"> with Unicode filename!
(
# Chrome:
# Chrome, Firefox:
# Content-Disposition: form-data; name="foo%22;bar\"; filename="😀"
'form-data; name="foo%22;bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
# cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'})
# werkzeug: ('form-data', {'name': 'foo%22;bar"; filename='})
),
(
# Firefox:
# Content-Disposition: form-data; name="foo\";bar\"; filename="😀"
'form-data; name="foo\\";bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
# cgi: ('form-data', {'name': 'foo";bar"; filename="😀'})
# werkzeug: ('form-data', {'name': 'foo";bar"; filename='})
# werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='})
),
],
)
@@ -187,27 +179,24 @@ def test_request_line(app):
@pytest.mark.parametrize(
"raw",
"raw,expected_subtype",
(
"show/first, show/second",
"show/*, show/first",
"*/*, show/first",
"*/*, show/*",
"other/*; q=0.1, show/*; q=0.2",
"show/first; q=0.5, show/second; q=0.5",
"show/first; foo=bar, show/second; foo=bar",
"show/second, show/first; foo=bar",
"show/second; q=0.5, show/first; foo=bar; q=0.5",
"show/second; q=0.5, show/first; q=1.0",
"show/first, show/second; q=1.0",
("show/first, show/second", "first"),
("show/*, show/first", "first"),
("*/*, show/first", "first"),
("*/*, show/*", "*"),
("other/*; q=0.1, show/*; q=0.2", "*"),
("show/first; q=0.5, show/second; q=0.5", "first"),
("show/first; foo=bar, show/second; foo=bar", "first"),
("show/second, show/first; foo=bar", "first"),
("show/second; q=0.5, show/first; foo=bar; q=0.5", "first"),
("show/second; q=0.5, show/first; q=1.0", "first"),
("show/first, show/second; q=1.0", "second"),
),
)
def test_parse_accept_ordered_okay(raw):
def test_parse_accept_ordered_okay(raw, expected_subtype):
ordered = headers.parse_accept(raw)
expected_subtype = (
"*" if all(q.subtype.is_wildcard for q in ordered) else "first"
)
assert ordered[0].type_ == "show"
assert ordered[0].type == "show"
assert ordered[0].subtype == expected_subtype
@@ -217,6 +206,7 @@ def test_parse_accept_ordered_okay(raw):
"missing",
"missing/",
"/missing",
"/",
),
)
def test_bad_accept(raw):
@@ -225,128 +215,83 @@ def test_bad_accept(raw):
def test_empty_accept():
assert headers.parse_accept("") == []
a = headers.parse_accept("")
assert a == []
assert not a.match("*/*")
def test_wildcard_accept_set_ok():
accept = headers.parse_accept("*/*")[0]
assert accept.type_.is_wildcard
assert accept.subtype.is_wildcard
assert accept.type == "*"
assert accept.subtype == "*"
assert accept.has_wildcard
accept = headers.parse_accept("foo/*")[0]
assert accept.type == "foo"
assert accept.subtype == "*"
assert accept.has_wildcard
accept = headers.parse_accept("foo/bar")[0]
assert not accept.type_.is_wildcard
assert not accept.subtype.is_wildcard
assert accept.type == "foo"
assert accept.subtype == "bar"
assert not accept.has_wildcard
def test_accept_parsed_against_str():
accept = headers.Accept.parse("foo/bar")
assert accept > "foo/bar; q=0.1"
def test_media_type_equality():
assert headers.MediaType("foo") == headers.MediaType("foo") == "foo"
assert headers.MediaType("foo") == headers.MediaType("*") == "*"
assert headers.MediaType("foo") != headers.MediaType("bar")
assert headers.MediaType("foo") != "bar"
accept = headers.Matched.parse("foo/bar")
assert accept == "foo/bar; q=0.1"
def test_media_type_matching():
assert headers.MediaType("foo").match(headers.MediaType("foo"))
assert headers.MediaType("foo").match("foo")
assert not headers.MediaType("foo").match(headers.MediaType("*"))
assert not headers.MediaType("foo").match("*")
assert not headers.MediaType("foo").match(headers.MediaType("bar"))
assert not headers.MediaType("foo").match("bar")
assert headers.MediaType("foo", "bar").match(
headers.MediaType("foo", "bar")
)
assert headers.MediaType("foo", "bar").match("foo/bar")
@pytest.mark.parametrize(
"value,other,outcome,allow_type,allow_subtype",
"value,other,outcome",
(
# ALLOW BOTH
("foo/bar", "foo/bar", True, True, True),
("foo/bar", headers.Accept.parse("foo/bar"), True, True, True),
("foo/bar", "foo/*", True, True, True),
("foo/bar", headers.Accept.parse("foo/*"), True, True, True),
("foo/bar", "*/*", True, True, True),
("foo/bar", headers.Accept.parse("*/*"), True, True, True),
("foo/*", "foo/bar", True, True, True),
("foo/*", headers.Accept.parse("foo/bar"), True, True, True),
("foo/*", "foo/*", True, True, True),
("foo/*", headers.Accept.parse("foo/*"), True, True, True),
("foo/*", "*/*", True, True, True),
("foo/*", headers.Accept.parse("*/*"), True, True, True),
("*/*", "foo/bar", True, True, True),
("*/*", headers.Accept.parse("foo/bar"), True, True, True),
("*/*", "foo/*", True, True, True),
("*/*", headers.Accept.parse("foo/*"), True, True, True),
("*/*", "*/*", True, True, True),
("*/*", headers.Accept.parse("*/*"), True, True, True),
# ALLOW TYPE
("foo/bar", "foo/bar", True, True, False),
("foo/bar", headers.Accept.parse("foo/bar"), True, True, False),
("foo/bar", "foo/*", False, True, False),
("foo/bar", headers.Accept.parse("foo/*"), False, True, False),
("foo/bar", "*/*", False, True, False),
("foo/bar", headers.Accept.parse("*/*"), False, True, False),
("foo/*", "foo/bar", False, True, False),
("foo/*", headers.Accept.parse("foo/bar"), False, True, False),
("foo/*", "foo/*", False, True, False),
("foo/*", headers.Accept.parse("foo/*"), False, True, False),
("foo/*", "*/*", False, True, False),
("foo/*", headers.Accept.parse("*/*"), False, True, False),
("*/*", "foo/bar", False, True, False),
("*/*", headers.Accept.parse("foo/bar"), False, True, False),
("*/*", "foo/*", False, True, False),
("*/*", headers.Accept.parse("foo/*"), False, True, False),
("*/*", "*/*", False, True, False),
("*/*", headers.Accept.parse("*/*"), False, True, False),
# ALLOW SUBTYPE
("foo/bar", "foo/bar", True, False, True),
("foo/bar", headers.Accept.parse("foo/bar"), True, False, True),
("foo/bar", "foo/*", True, False, True),
("foo/bar", headers.Accept.parse("foo/*"), True, False, True),
("foo/bar", "*/*", False, False, True),
("foo/bar", headers.Accept.parse("*/*"), False, False, True),
("foo/*", "foo/bar", True, False, True),
("foo/*", headers.Accept.parse("foo/bar"), True, False, True),
("foo/*", "foo/*", True, False, True),
("foo/*", headers.Accept.parse("foo/*"), True, False, True),
("foo/*", "*/*", False, False, True),
("foo/*", headers.Accept.parse("*/*"), False, False, True),
("*/*", "foo/bar", False, False, True),
("*/*", headers.Accept.parse("foo/bar"), False, False, True),
("*/*", "foo/*", False, False, True),
("*/*", headers.Accept.parse("foo/*"), False, False, True),
("*/*", "*/*", False, False, True),
("*/*", headers.Accept.parse("*/*"), False, False, True),
("foo/bar", "foo/bar", True),
("foo/bar", headers.Matched.parse("foo/bar"), True),
("foo/bar", "foo/*", True),
("foo/bar", headers.Matched.parse("foo/*"), True),
("foo/bar", "*/*", True),
("foo/bar", headers.Matched.parse("*/*"), True),
("foo/*", "foo/bar", True),
("foo/*", headers.Matched.parse("foo/bar"), True),
("foo/*", "foo/*", True),
("foo/*", headers.Matched.parse("foo/*"), True),
("foo/*", "*/*", True),
("foo/*", headers.Matched.parse("*/*"), True),
("*/*", "foo/bar", True),
("*/*", headers.Matched.parse("foo/bar"), True),
("*/*", "foo/*", True),
("*/*", headers.Matched.parse("foo/*"), True),
("*/*", "*/*", True),
("*/*", headers.Matched.parse("*/*"), True),
),
)
def test_accept_matching(value, other, outcome, allow_type, allow_subtype):
assert (
headers.Accept.parse(value).match(
other,
allow_type_wildcard=allow_type,
allow_subtype_wildcard=allow_subtype,
)
is outcome
)
def test_accept_matching(value, other, outcome):
assert bool(headers.Matched.parse(value).match(other)) is outcome
@pytest.mark.parametrize("value", ("foo/bar", "foo/*", "*/*"))
def test_value_in_accept(value):
acceptable = headers.parse_accept(value)
assert "foo/bar" in acceptable
assert "foo/*" in acceptable
assert "*/*" in acceptable
assert acceptable.match("foo/bar")
assert acceptable.match("foo/*")
assert acceptable.match("*/*")
@pytest.mark.parametrize("value", ("foo/bar", "foo/*"))
def test_value_not_in_accept(value):
acceptable = headers.parse_accept(value)
assert "no/match" not in acceptable
assert "no/*" not in acceptable
assert not acceptable.match("no/match")
assert not acceptable.match("no/*")
assert "*/*" not in acceptable
assert "*/bar" not in acceptable
@pytest.mark.parametrize(
@@ -365,6 +310,160 @@ def test_value_not_in_accept(value):
),
),
)
def test_browser_headers(header, expected):
def test_browser_headers_general(header, expected):
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert request.accept == expected
assert [str(item) for item in request.accept] == expected
@pytest.mark.parametrize(
"header,expected",
(
(
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", # noqa: E501
[
("text/html", 1.0),
("application/xhtml+xml", 1.0),
("image/avif", 1.0),
("image/webp", 1.0),
("application/xml", 0.9),
("*/*", 0.8),
],
),
),
)
def test_browser_headers_specific(header, expected):
mimes = [e[0] for e in expected]
qs = [e[1] for e in expected]
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert request.accept == mimes
for a, m, q in zip(request.accept, mimes, qs):
assert a == m
assert a.mime == m
assert a.q == q
@pytest.mark.parametrize(
"raw",
(
"text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8",
"application/xml;q=0.9, */*;q=0.8, text/html, application/xhtml+xml",
(
"foo/bar;q=0.9, */*;q=0.8, text/html=0.8, "
"text/plain, application/xhtml+xml"
),
),
)
def test_accept_ordering(raw):
"""Should sort by q but also be stable."""
accept = headers.parse_accept(raw)
assert accept[0].type == "text"
raw1 = ", ".join(str(a) for a in accept)
accept = headers.parse_accept(raw1)
raw2 = ", ".join(str(a) for a in accept)
assert raw1 == raw2
def test_not_accept_wildcard():
accept = headers.parse_accept("*/*, foo/*, */bar, foo/bar;q=0.1")
assert not accept.match(
"text/html", "foo/foo", "bar/bar", accept_wildcards=False
)
# Should ignore wildcards in accept but still matches them from mimes
m = accept.match("text/plain", "*/*", accept_wildcards=False)
assert m.mime == "*/*"
assert m.match("*/*")
assert m.header == "foo/bar"
assert not accept.match(
"text/html", "foo/foo", "bar/bar", accept_wildcards=False
)
def test_accept_misc():
header = (
"foo/bar;q=0.0, */plain;param=123, text/plain, text/*, foo/bar;q=0.5"
)
a = headers.parse_accept(header)
assert repr(a) == (
"[*/plain;param=123, text/plain, text/*, "
"foo/bar;q=0.5, foo/bar;q=0.0]"
) # noqa: E501
assert str(a) == (
"*/plain;param=123, text/plain, text/*, "
"foo/bar;q=0.5, foo/bar;q=0.0"
) # noqa: E501
# q=1 types don't match foo/bar but match the two others,
# text/* comes first and matches */plain because it
# comes first in the header
m = a.match("foo/bar", "text/*", "text/plain")
assert repr(m) == "<text/* matched */plain;param=123>"
assert m == "text/*"
assert m.mime == "text/*"
assert m.header.mime == "*/plain"
assert m.header.type == "*"
assert m.header.subtype == "plain"
assert m.header.q == 1.0
assert m.header.params == dict(param="123")
# Matches object against another Matched object (by mime and header)
assert m == a.match("text/*")
# Against unsupported type falls back to object id matching
assert m != 123
# Matches the highest q value
m = a.match("foo/bar")
assert repr(m) == "<foo/bar matched foo/bar;q=0.5>"
assert m == "foo/bar"
assert m == "foo/bar;q=0.5"
# Matching nothing special case
m = a.match()
assert m == ""
assert m.header is None
# No header means anything
a = headers.parse_accept(None)
assert a == ["*/*"]
assert a.match("foo/bar")
# Empty header means nothing
a = headers.parse_accept("")
assert a == []
assert not a.match("foo/bar")
@pytest.mark.parametrize(
"headers,expected",
(
({"foo": "bar"}, "bar"),
((("foo", "bar"), ("foo", "baz")), "bar,baz"),
({}, ""),
),
)
def test_field_simple_accessor(headers, expected):
request = make_request(headers)
assert request.headers.foo == request.headers.foo_ == expected
@pytest.mark.parametrize(
"headers,expected",
(
({"foo-bar": "bar"}, "bar"),
((("foo-bar", "bar"), ("foo-bar", "baz")), "bar,baz"),
),
)
def test_field_hyphenated_accessor(headers, expected):
request = make_request(headers)
assert request.headers.foo_bar == request.headers.foo_bar_ == expected
def test_bad_accessor():
request = make_request({})
msg = "'Header' object has no attribute '_foo'"
with pytest.raises(AttributeError, match=msg):
request.headers._foo
def test_multiple_fields_accessor(app: Sanic):
@app.get("")
async def handler(request: Request):
return json({"field": request.headers.example_field})
_, response = app.test_client.get(
"/", headers=(("Example-Field", "Foo, Bar"), ("Example-Field", "Baz"))
)
assert response.json["field"] == "Foo, Bar,Baz"

View File

@@ -150,33 +150,47 @@ def test_request_accept():
async def get(request):
return response.empty()
header_value = "text/plain;format=flowed, text/plain, text/*, */*"
request, _ = app.test_client.get(
"/",
headers={
"Accept": "text/*, text/plain, text/plain;format=flowed, */*"
},
headers={"Accept": header_value},
)
assert request.accept == [
assert str(request.accept) == header_value
match = request.accept.match(
"*/*;format=flowed",
"text/plain;format=flowed",
"text/plain",
"text/*",
"*/*",
]
)
assert match == "*/*;format=flowed"
assert match.header.mime == "text/plain"
assert match.header.params == {"format": "flowed"}
header_value = (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
request, _ = app.test_client.get(
"/",
headers={
"Accept": (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
},
headers={"Accept": header_value},
)
assert request.accept == [
assert [str(i) for i in request.accept] == [
"text/html",
"text/x-c",
"text/x-dvi; q=0.8",
"text/plain; q=0.5",
"text/x-dvi;q=0.8",
"text/plain;q=0.5",
]
match = request.accept.match(
"application/json",
"text/plain", # Has lower q in accept header
"text/html;format=flowed", # Params mismatch
"text/*", # Matches
"*/*",
)
assert match == "text/*"
assert match.header.mime == "text/html"
assert match.header.q == 1.0
assert not match.header.params
def test_bad_url_parse():