Accept header parsing (#2200)

* Add some tests

* docstring

* Add accept matching

* Add some more tests on matching

* Add matching flags for wildcards

* Add mathing controls to accept

* Limit uvicorn 14 in testing
This commit is contained in:
Adam Hopkins
2021-08-19 21:09:40 +03:00
committed by GitHub
parent e2eefaac55
commit f32ef20b74
6 changed files with 387 additions and 4 deletions

View File

@@ -126,8 +126,11 @@ class HeaderNotFound(InvalidUsage):
**Status**: 400 Bad Request
"""
status_code = 400
quiet = True
class InvalidHeader(InvalidUsage):
"""
**Status**: 400 Bad Request
"""
class ContentRangeError(SanicException):

View File

@@ -1,8 +1,11 @@
from __future__ import annotations
import re
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import unquote
from sanic.exceptions import InvalidHeader
from sanic.helpers import STATUS_CODES
@@ -30,6 +33,154 @@ _host_re = re.compile(
# For more information, consult ../tests/test_requests.py
def parse_arg_as_accept(f):
def func(self, other, *args, **kwargs):
if not isinstance(other, Accept):
other = Accept.parse(other)
return f(self, other, *args, **kwargs)
return func
class MediaType(str):
def __new__(cls, value: str):
return str.__new__(cls, value)
def __init__(self, value: str) -> None:
self.value = value
self.is_wildcard = self.check_if_wildcard(value)
def __eq__(self, other):
if self.is_wildcard:
return True
if self.match(other):
return True
other_is_wildcard = (
other.is_wildcard
if isinstance(other, MediaType)
else self.check_if_wildcard(other)
)
return other_is_wildcard
def match(self, other):
other_value = other.value if isinstance(other, MediaType) else other
return self.value == other_value
@staticmethod
def check_if_wildcard(value):
return value == "*"
class Accept(str):
def __new__(cls, value: str, *args, **kwargs):
return str.__new__(cls, value)
def __init__(
self,
value: str,
type_: MediaType,
subtype: MediaType,
*,
q: str = "1.0",
**kwargs: str,
):
qvalue = float(q)
if qvalue > 1 or qvalue < 0:
raise InvalidHeader(
f"Accept header qvalue must be between 0 and 1, not: {qvalue}"
)
self.value = value
self.type_ = type_
self.subtype = subtype
self.qvalue = qvalue
self.params = kwargs
def _compare(self, other, method):
try:
return method(self.qvalue, other.qvalue)
except (AttributeError, TypeError):
return NotImplemented
@parse_arg_as_accept
def __lt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s < o)
@parse_arg_as_accept
def __le__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s <= o)
@parse_arg_as_accept
def __eq__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s == o)
@parse_arg_as_accept
def __ge__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s >= o)
@parse_arg_as_accept
def __gt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s > o)
@parse_arg_as_accept
def __ne__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s != o)
@parse_arg_as_accept
def match(
self,
other,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
type_match = (
self.type_ == other.type_
if allow_type_wildcard
else (
self.type_.match(other.type_)
and not self.type_.is_wildcard
and not other.type_.is_wildcard
)
)
subtype_match = (
self.subtype == other.subtype
if allow_subtype_wildcard
else (
self.subtype.match(other.subtype)
and not self.subtype.is_wildcard
and not other.subtype.is_wildcard
)
)
return type_match and subtype_match
@classmethod
def parse(cls, raw: str) -> Accept:
invalid = False
mtype = raw.strip()
try:
media, *raw_params = mtype.split(";")
type_, subtype = media.split("/")
except ValueError:
invalid = True
if invalid or not type_ or not subtype:
raise InvalidHeader(f"Header contains invalid Accept value: {raw}")
params = dict(
[
(key.strip(), value.strip())
for key, value in (param.split("=", 1) for param in raw_params)
]
)
return cls(mtype, MediaType(type_), MediaType(subtype), **params)
def parse_content_header(value: str) -> Tuple[str, Options]:
"""Parse content-type and content-disposition header values.
@@ -194,3 +345,29 @@ def format_http1_response(status: int, headers: HeaderBytesIterable) -> bytes:
ret += b"%b: %b\r\n" % h
ret += b"\r\n"
return ret
def _sort_accept_value(accept: Accept):
return (
accept.qvalue,
len(accept.params),
accept.subtype != "*",
accept.type_ != "*",
)
def parse_accept(accept: str) -> List[Accept]:
"""Parse an Accept header and order the acceptable media types in
accorsing to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
media_types = accept.split(",")
accept_list: List[Accept] = []
for mtype in media_types:
if not mtype:
continue
accept_list.append(Accept.parse(mtype))
return sorted(accept_list, key=_sort_accept_value, reverse=True)

View File

@@ -34,7 +34,9 @@ from sanic.compat import CancelledErrors, Header
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import InvalidUsage
from sanic.headers import (
Accept,
Options,
parse_accept,
parse_content_header,
parse_forwarded,
parse_host,
@@ -94,6 +96,7 @@ class Request:
"head",
"headers",
"method",
"parsed_accept",
"parsed_args",
"parsed_not_grouped_args",
"parsed_files",
@@ -136,6 +139,7 @@ class Request:
self.conn_info: Optional[ConnInfo] = None
self.ctx = SimpleNamespace()
self.parsed_forwarded: Optional[Options] = None
self.parsed_accept: Optional[List[Accept]] = None
self.parsed_json = None
self.parsed_form = None
self.parsed_files = None
@@ -296,6 +300,13 @@ class Request:
return self.parsed_json
@property
def accept(self) -> List[Accept]:
if self.parsed_accept is None:
accept_header = self.headers.getone("accept", "")
self.parsed_accept = parse_accept(accept_header)
return self.parsed_accept
@property
def token(self):
"""Attempt to return the auth header token.