Refresh Request.accept functionality (#2687)
This commit is contained in:
@@ -470,10 +470,8 @@ def exception_response(
|
||||
# Source:
|
||||
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Content_negotiation/List_of_default_Accept_values
|
||||
|
||||
if acceptable and acceptable[0].match(
|
||||
"text/html",
|
||||
allow_type_wildcard=False,
|
||||
allow_subtype_wildcard=False,
|
||||
if acceptable and acceptable.match(
|
||||
"text/html", accept_wildcards=False
|
||||
):
|
||||
renderer = HTMLRenderer
|
||||
|
||||
@@ -483,9 +481,7 @@ def exception_response(
|
||||
elif (
|
||||
acceptable
|
||||
and acceptable.match(
|
||||
"application/json",
|
||||
allow_type_wildcard=False,
|
||||
allow_subtype_wildcard=False,
|
||||
"application/json", accept_wildcards=False
|
||||
)
|
||||
or content_type == "application/json"
|
||||
):
|
||||
@@ -514,13 +510,13 @@ def exception_response(
|
||||
# our choice is okay
|
||||
if acceptable:
|
||||
type_ = CONTENT_TYPE_BY_RENDERERS.get(renderer) # type: ignore
|
||||
if type_ and type_ not in acceptable:
|
||||
if type_ and not acceptable.match(type_):
|
||||
# If the renderer selected is not in the Accept header
|
||||
# look through what is in the Accept header, and select
|
||||
# the first option that matches. Otherwise, just drop back
|
||||
# to the original default
|
||||
for accept in acceptable:
|
||||
mtype = f"{accept.type_}/{accept.subtype}"
|
||||
mtype = f"{accept.type}/{accept.subtype}"
|
||||
maybe = RENDERERS_BY_CONTENT_TYPE.get(mtype)
|
||||
if maybe:
|
||||
renderer = maybe
|
||||
|
||||
366
sanic/headers.py
366
sanic/headers.py
@@ -33,143 +33,96 @@ _host_re = re.compile(
|
||||
# For more information, consult ../tests/test_requests.py
|
||||
|
||||
|
||||
def parse_arg_as_accept(f):
|
||||
def func(self, other, *args, **kwargs):
|
||||
if not isinstance(other, Accept) and other:
|
||||
other = Accept.parse(other)
|
||||
return f(self, other, *args, **kwargs)
|
||||
|
||||
return func
|
||||
|
||||
|
||||
class MediaType(str):
|
||||
def __new__(cls, value: str):
|
||||
return str.__new__(cls, value)
|
||||
|
||||
def __init__(self, value: str) -> None:
|
||||
self.value = value
|
||||
self.is_wildcard = self.check_if_wildcard(value)
|
||||
|
||||
def __eq__(self, other):
|
||||
if self.is_wildcard:
|
||||
return True
|
||||
|
||||
if self.match(other):
|
||||
return True
|
||||
|
||||
other_is_wildcard = (
|
||||
other.is_wildcard
|
||||
if isinstance(other, MediaType)
|
||||
else self.check_if_wildcard(other)
|
||||
)
|
||||
|
||||
return other_is_wildcard
|
||||
|
||||
def match(self, other):
|
||||
other_value = other.value if isinstance(other, MediaType) else other
|
||||
return self.value == other_value
|
||||
|
||||
@staticmethod
|
||||
def check_if_wildcard(value):
|
||||
return value == "*"
|
||||
|
||||
|
||||
class Accept(str):
|
||||
def __new__(cls, value: str, *args, **kwargs):
|
||||
return str.__new__(cls, value)
|
||||
class MediaType:
|
||||
"""A media type, as used in the Accept header."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: str,
|
||||
type_: MediaType,
|
||||
subtype: MediaType,
|
||||
*,
|
||||
q: str = "1.0",
|
||||
**kwargs: str,
|
||||
type_: str,
|
||||
subtype: str,
|
||||
**params: str,
|
||||
):
|
||||
qvalue = float(q)
|
||||
if qvalue > 1 or qvalue < 0:
|
||||
raise InvalidHeader(
|
||||
f"Accept header qvalue must be between 0 and 1, not: {qvalue}"
|
||||
)
|
||||
self.value = value
|
||||
self.type_ = type_
|
||||
self.type = type_
|
||||
self.subtype = subtype
|
||||
self.qvalue = qvalue
|
||||
self.params = kwargs
|
||||
self.q = float(params.get("q", "1.0"))
|
||||
self.params = params
|
||||
self.mime = f"{type_}/{subtype}"
|
||||
self.key = (
|
||||
-1 * self.q,
|
||||
-1 * len(self.params),
|
||||
self.subtype == "*",
|
||||
self.type == "*",
|
||||
)
|
||||
|
||||
def _compare(self, other, method):
|
||||
try:
|
||||
return method(self.qvalue, other.qvalue)
|
||||
except (AttributeError, TypeError):
|
||||
return NotImplemented
|
||||
def __repr__(self):
|
||||
return self.mime + "".join(f";{k}={v}" for k, v in self.params.items())
|
||||
|
||||
@parse_arg_as_accept
|
||||
def __lt__(self, other: Union[str, Accept]):
|
||||
return self._compare(other, lambda s, o: s < o)
|
||||
def __eq__(self, other):
|
||||
"""Check for mime (str or MediaType) identical type/subtype.
|
||||
Parameters such as q are not considered."""
|
||||
if isinstance(other, str):
|
||||
# Give a friendly reminder if str contains parameters
|
||||
if ";" in other:
|
||||
raise ValueError("Use match() to compare with parameters")
|
||||
return self.mime == other
|
||||
if isinstance(other, MediaType):
|
||||
# Ignore parameters silently with MediaType objects
|
||||
return self.mime == other.mime
|
||||
return NotImplemented
|
||||
|
||||
@parse_arg_as_accept
|
||||
def __le__(self, other: Union[str, Accept]):
|
||||
return self._compare(other, lambda s, o: s <= o)
|
||||
|
||||
@parse_arg_as_accept
|
||||
def __eq__(self, other: Union[str, Accept]): # type: ignore
|
||||
return self._compare(other, lambda s, o: s == o)
|
||||
|
||||
@parse_arg_as_accept
|
||||
def __ge__(self, other: Union[str, Accept]):
|
||||
return self._compare(other, lambda s, o: s >= o)
|
||||
|
||||
@parse_arg_as_accept
|
||||
def __gt__(self, other: Union[str, Accept]):
|
||||
return self._compare(other, lambda s, o: s > o)
|
||||
|
||||
@parse_arg_as_accept
|
||||
def __ne__(self, other: Union[str, Accept]): # type: ignore
|
||||
return self._compare(other, lambda s, o: s != o)
|
||||
|
||||
@parse_arg_as_accept
|
||||
def match(
|
||||
self,
|
||||
other,
|
||||
*,
|
||||
allow_type_wildcard: bool = True,
|
||||
allow_subtype_wildcard: bool = True,
|
||||
) -> bool:
|
||||
type_match = (
|
||||
self.type_ == other.type_
|
||||
if allow_type_wildcard
|
||||
else (
|
||||
self.type_.match(other.type_)
|
||||
and not self.type_.is_wildcard
|
||||
and not other.type_.is_wildcard
|
||||
)
|
||||
mime_with_params: Union[str, MediaType],
|
||||
) -> Optional[MediaType]:
|
||||
"""Check if this media type matches the given mime type/subtype.
|
||||
Wildcards are supported both ways on both type and subtype.
|
||||
If mime contains a semicolon, optionally followed by parameters,
|
||||
the parameters of the two media types must match exactly.
|
||||
Note: Use the `==` operator instead to check for literal matches
|
||||
without expanding wildcards.
|
||||
@param media_type: A type/subtype string to match.
|
||||
@return `self` if the media types are compatible, else `None`
|
||||
"""
|
||||
mt = (
|
||||
MediaType._parse(mime_with_params)
|
||||
if isinstance(mime_with_params, str)
|
||||
else mime_with_params
|
||||
)
|
||||
subtype_match = (
|
||||
self.subtype == other.subtype
|
||||
if allow_subtype_wildcard
|
||||
else (
|
||||
self.subtype.match(other.subtype)
|
||||
and not self.subtype.is_wildcard
|
||||
and not other.subtype.is_wildcard
|
||||
return (
|
||||
self
|
||||
if (
|
||||
mt
|
||||
# All parameters given in the other media type must match
|
||||
and all(self.params.get(k) == v for k, v in mt.params.items())
|
||||
# Subtype match
|
||||
and (
|
||||
self.subtype == mt.subtype
|
||||
or self.subtype == "*"
|
||||
or mt.subtype == "*"
|
||||
)
|
||||
# Type match
|
||||
and (
|
||||
self.type == mt.type or self.type == "*" or mt.type == "*"
|
||||
)
|
||||
)
|
||||
else None
|
||||
)
|
||||
|
||||
return type_match and subtype_match
|
||||
@property
|
||||
def has_wildcard(self) -> bool:
|
||||
"""Return True if this media type has a wildcard in it."""
|
||||
return any(part == "*" for part in (self.subtype, self.type))
|
||||
|
||||
@classmethod
|
||||
def parse(cls, raw: str) -> Accept:
|
||||
invalid = False
|
||||
mtype = raw.strip()
|
||||
def _parse(cls, mime_with_params: str) -> Optional[MediaType]:
|
||||
mtype = mime_with_params.strip()
|
||||
if "/" not in mime_with_params:
|
||||
return None
|
||||
|
||||
try:
|
||||
media, *raw_params = mtype.split(";")
|
||||
type_, subtype = media.split("/")
|
||||
except ValueError:
|
||||
invalid = True
|
||||
|
||||
if invalid or not type_ or not subtype:
|
||||
raise InvalidHeader(f"Header contains invalid Accept value: {raw}")
|
||||
mime, *raw_params = mtype.split(";")
|
||||
type_, subtype = mime.split("/", 1)
|
||||
if not type_ or not subtype:
|
||||
raise ValueError(f"Invalid media type: {mtype}")
|
||||
|
||||
params = dict(
|
||||
[
|
||||
@@ -178,29 +131,140 @@ class Accept(str):
|
||||
]
|
||||
)
|
||||
|
||||
return cls(mtype, MediaType(type_), MediaType(subtype), **params)
|
||||
return cls(type_.lstrip(), subtype.rstrip(), **params)
|
||||
|
||||
|
||||
class AcceptContainer(list):
|
||||
def __contains__(self, o: object) -> bool:
|
||||
return any(item.match(o) for item in self)
|
||||
class Matched:
|
||||
"""A matching result of a MIME string against a header."""
|
||||
|
||||
def match(
|
||||
self,
|
||||
o: object,
|
||||
*,
|
||||
allow_type_wildcard: bool = True,
|
||||
allow_subtype_wildcard: bool = True,
|
||||
) -> bool:
|
||||
return any(
|
||||
item.match(
|
||||
o,
|
||||
allow_type_wildcard=allow_type_wildcard,
|
||||
allow_subtype_wildcard=allow_subtype_wildcard,
|
||||
def __init__(self, mime: str, header: Optional[MediaType]):
|
||||
self.mime = mime
|
||||
self.header = header
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{self} matched {self.header}>" if self else "<no match>"
|
||||
|
||||
def __str__(self):
|
||||
return self.mime
|
||||
|
||||
def __bool__(self):
|
||||
return self.header is not None
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
try:
|
||||
comp, other_accept = self._compare(other)
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
return bool(
|
||||
comp
|
||||
and (
|
||||
(self.header and other_accept.header)
|
||||
or (not self.header and not other_accept.header)
|
||||
)
|
||||
for item in self
|
||||
)
|
||||
|
||||
def _compare(self, other) -> Tuple[bool, Matched]:
|
||||
if isinstance(other, str):
|
||||
# return self.mime == other, Accept.parse(other)
|
||||
parsed = Matched.parse(other)
|
||||
if self.mime == other:
|
||||
return True, parsed
|
||||
other = parsed
|
||||
|
||||
if isinstance(other, Matched):
|
||||
return self.header == other.header, other
|
||||
|
||||
raise TypeError(
|
||||
"Comparison not supported between unequal "
|
||||
f"mime types of '{self.mime}' and '{other}'"
|
||||
)
|
||||
|
||||
def match(self, other: Union[str, Matched]) -> Optional[Matched]:
|
||||
accept = Matched.parse(other) if isinstance(other, str) else other
|
||||
if not self.header or not accept.header:
|
||||
return None
|
||||
if self.header.match(accept.header):
|
||||
return accept
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def parse(cls, raw: str) -> Matched:
|
||||
media_type = MediaType._parse(raw)
|
||||
return cls(raw, media_type)
|
||||
|
||||
|
||||
class AcceptList(list):
|
||||
"""A list of media types, as used in the Accept header.
|
||||
|
||||
The Accept header entries are listed in order of preference, starting
|
||||
with the most preferred. This class is a list of `MediaType` objects,
|
||||
that encapsulate also the q value or any other parameters.
|
||||
|
||||
Two separate methods are provided for searching the list:
|
||||
- 'match' for finding the most preferred match (wildcards supported)
|
||||
- operator 'in' for checking explicit matches (wildcards as literals)
|
||||
"""
|
||||
|
||||
def match(self, *mimes: str, accept_wildcards=True) -> Matched:
|
||||
"""Find a media type accepted by the client.
|
||||
|
||||
This method can be used to find which of the media types requested by
|
||||
the client is most preferred against the ones given as arguments.
|
||||
|
||||
The ordering of preference is set by:
|
||||
1. The order set by RFC 7231, s. 5.3.2, giving a higher priority
|
||||
to q values and more specific type definitions,
|
||||
2. The order of the arguments (first is most preferred), and
|
||||
3. The first matching entry on the Accept header.
|
||||
|
||||
Wildcards are matched both ways. A match is usually found, as the
|
||||
Accept headers typically include `*/*`, in particular if the header
|
||||
is missing, is not manually set, or if the client is a browser.
|
||||
|
||||
Note: the returned object behaves as a string of the mime argument
|
||||
that matched, and is empty/falsy if no match was found. The matched
|
||||
header entry `MediaType` or `None` is available as the `m` attribute.
|
||||
|
||||
@param mimes: Any MIME types to search for in order of preference.
|
||||
@param accept_wildcards: Match Accept entries with wildcards in them.
|
||||
@return A match object with the mime string and the MediaType object.
|
||||
"""
|
||||
a = sorted(
|
||||
(-acc.q, i, j, mime, acc)
|
||||
for j, acc in enumerate(self)
|
||||
if accept_wildcards or not acc.has_wildcard
|
||||
for i, mime in enumerate(mimes)
|
||||
if acc.match(mime)
|
||||
)
|
||||
return Matched(*(a[0][-2:] if a else ("", None)))
|
||||
|
||||
def __str__(self):
|
||||
"""Format as Accept header value (parsed, not original)."""
|
||||
return ", ".join(str(m) for m in self)
|
||||
|
||||
|
||||
def parse_accept(accept: Optional[str]) -> AcceptList:
|
||||
"""Parse an Accept header and order the acceptable media types in
|
||||
according to RFC 7231, s. 5.3.2
|
||||
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
|
||||
"""
|
||||
if not accept:
|
||||
if accept == "":
|
||||
return AcceptList() # Empty header, accept nothing
|
||||
accept = "*/*" # No header means that all types are accepted
|
||||
try:
|
||||
a = [
|
||||
mt
|
||||
for mt in [MediaType._parse(mtype) for mtype in accept.split(",")]
|
||||
if mt
|
||||
]
|
||||
if not a:
|
||||
raise ValueError
|
||||
return AcceptList(sorted(a, key=lambda x: x.key))
|
||||
except ValueError:
|
||||
raise InvalidHeader(f"Invalid header value in Accept: {accept}")
|
||||
|
||||
|
||||
def parse_content_header(value: str) -> Tuple[str, Options]:
|
||||
"""Parse content-type and content-disposition header values.
|
||||
@@ -368,34 +432,6 @@ def format_http1_response(status: int, headers: HeaderBytesIterable) -> bytes:
|
||||
return ret
|
||||
|
||||
|
||||
def _sort_accept_value(accept: Accept):
|
||||
return (
|
||||
accept.qvalue,
|
||||
len(accept.params),
|
||||
accept.subtype != "*",
|
||||
accept.type_ != "*",
|
||||
)
|
||||
|
||||
|
||||
def parse_accept(accept: str) -> AcceptContainer:
|
||||
"""Parse an Accept header and order the acceptable media types in
|
||||
accorsing to RFC 7231, s. 5.3.2
|
||||
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
|
||||
"""
|
||||
media_types = accept.split(",")
|
||||
accept_list: List[Accept] = []
|
||||
|
||||
for mtype in media_types:
|
||||
if not mtype:
|
||||
continue
|
||||
|
||||
accept_list.append(Accept.parse(mtype))
|
||||
|
||||
return AcceptContainer(
|
||||
sorted(accept_list, key=_sort_accept_value, reverse=True)
|
||||
)
|
||||
|
||||
|
||||
def parse_credentials(
|
||||
header: Optional[str],
|
||||
prefixes: Union[List, Tuple, Set] = None,
|
||||
|
||||
@@ -47,7 +47,7 @@ from sanic.constants import (
|
||||
)
|
||||
from sanic.exceptions import BadRequest, BadURL, ServerError
|
||||
from sanic.headers import (
|
||||
AcceptContainer,
|
||||
AcceptList,
|
||||
Options,
|
||||
parse_accept,
|
||||
parse_content_header,
|
||||
@@ -167,7 +167,7 @@ class Request:
|
||||
self.conn_info: Optional[ConnInfo] = None
|
||||
self.ctx = SimpleNamespace()
|
||||
self.parsed_forwarded: Optional[Options] = None
|
||||
self.parsed_accept: Optional[AcceptContainer] = None
|
||||
self.parsed_accept: Optional[AcceptList] = None
|
||||
self.parsed_credentials: Optional[Credentials] = None
|
||||
self.parsed_json = None
|
||||
self.parsed_form: Optional[RequestParameters] = None
|
||||
@@ -499,10 +499,10 @@ class Request:
|
||||
return self.parsed_json
|
||||
|
||||
@property
|
||||
def accept(self) -> AcceptContainer:
|
||||
def accept(self) -> AcceptList:
|
||||
"""
|
||||
:return: The ``Accept`` header parsed
|
||||
:rtype: AcceptContainer
|
||||
:rtype: AcceptList
|
||||
"""
|
||||
if self.parsed_accept is None:
|
||||
accept_header = self.headers.getone("accept", "")
|
||||
|
||||
Reference in New Issue
Block a user