Add convenience methods for cookie creation and deletion (#2706)

* Add convenience methods for cookie creation and deletion

* Restore del

* Backwards compat, forward thinking

* Add delitem deprecation notice

* Add get full deprecation notice

* Add deprecation docstring

* Better deprecation docstring

* Add has_cookie

* Same defaults

* Better deprecation message

* Accessor annotations

* make pretty

* parse cookies

* Revert quote translator

* make pretty

* make pretty

* Add unit tests

* Make pretty

* Fix unit tests

* Directly include unquote

* Add some more unit tests

* Move modules into their own dir

* make pretty

* cleanup test imports

* Add test for cookie accessor

* Make test consistent

* Remove file

* Remove additional escaping

* Add header style getattr for hyphens

* Add test for cookie accessor with hyphens

* Add new translator

* Parametrize test_request_with_duplicate_cookie_key

* make pretty

* Add deprecation of direct cookie encoding

* Speedup Cookie creation

* Implement prefixes on delete_cookie

* typing changes

* Add passthru functions on response objects for setting cookies

* Add test for passthru

---------

Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
This commit is contained in:
Adam Hopkins
2023-03-21 11:25:35 +02:00
committed by GitHub
parent 61aa16f6ac
commit 1a63b9bec0
13 changed files with 1318 additions and 316 deletions

View File

@@ -1,156 +0,0 @@
import re
import string
from datetime import datetime
from typing import Dict
DEFAULT_MAX_AGE = 0
# ------------------------------------------------------------ #
# SimpleCookie
# ------------------------------------------------------------ #
# Straight up copied this section of dark magic from SimpleCookie
_LegalChars = string.ascii_letters + string.digits + "!#$%&'*+-.^_`|~:"
_UnescapedChars = _LegalChars + " ()/<=>?@[]{}"
_Translator = {
n: "\\%03o" % n for n in set(range(256)) - set(map(ord, _UnescapedChars))
}
_Translator.update({ord('"'): '\\"', ord("\\"): "\\\\"})
def _quote(str):
r"""Quote a string for use in a cookie header.
If the string does not need to be double-quoted, then just return the
string. Otherwise, surround the string in doublequotes and quote
(with a \) special characters.
"""
if str is None or _is_legal_key(str):
return str
else:
return '"' + str.translate(_Translator) + '"'
_is_legal_key = re.compile("[%s]+" % re.escape(_LegalChars)).fullmatch
# ------------------------------------------------------------ #
# Custom SimpleCookie
# ------------------------------------------------------------ #
class CookieJar(dict):
"""
CookieJar dynamically writes headers as cookies are added and removed
It gets around the limitation of one header per name by using the
MultiHeader class to provide a unique key that encodes to Set-Cookie.
"""
def __init__(self, headers):
super().__init__()
self.headers: Dict[str, str] = headers
self.cookie_headers: Dict[str, str] = {}
self.header_key: str = "Set-Cookie"
def __setitem__(self, key, value):
# If this cookie doesn't exist, add it to the header keys
if not self.cookie_headers.get(key):
cookie = Cookie(key, value)
cookie["path"] = "/"
self.cookie_headers[key] = self.header_key
self.headers.add(self.header_key, cookie)
return super().__setitem__(key, cookie)
else:
self[key].value = value
def __delitem__(self, key):
if key not in self.cookie_headers:
self[key] = ""
self[key]["max-age"] = 0
else:
cookie_header = self.cookie_headers[key]
# remove it from header
cookies = self.headers.popall(cookie_header)
for cookie in cookies:
if cookie.key != key:
self.headers.add(cookie_header, cookie)
del self.cookie_headers[key]
return super().__delitem__(key)
class Cookie(dict):
"""A stripped down version of Morsel from SimpleCookie #gottagofast"""
_keys = {
"expires": "expires",
"path": "Path",
"comment": "Comment",
"domain": "Domain",
"max-age": "Max-Age",
"secure": "Secure",
"httponly": "HttpOnly",
"version": "Version",
"samesite": "SameSite",
}
_flags = {"secure", "httponly"}
def __init__(self, key, value):
if key in self._keys:
raise KeyError("Cookie name is a reserved word")
if not _is_legal_key(key):
raise KeyError("Cookie key contains illegal characters")
self.key = key
self.value = value
super().__init__()
def __setitem__(self, key, value):
if key not in self._keys:
raise KeyError("Unknown cookie property")
if value is not False:
if key.lower() == "max-age":
if not str(value).isdigit():
raise ValueError("Cookie max-age must be an integer")
elif key.lower() == "expires":
if not isinstance(value, datetime):
raise TypeError(
"Cookie 'expires' property must be a datetime"
)
return super().__setitem__(key, value)
def encode(self, encoding):
"""
Encode the cookie content in a specific type of encoding instructed
by the developer. Leverages the :func:`str.encode` method provided
by python.
This method can be used to encode and embed ``utf-8`` content into
the cookies.
:param encoding: Encoding to be used with the cookie
:return: Cookie encoded in a codec of choosing.
:except: UnicodeEncodeError
"""
return str(self).encode(encoding)
def __str__(self):
"""Format as a Set-Cookie header value."""
output = ["%s=%s" % (self.key, _quote(self.value))]
for key, value in self.items():
if key == "max-age":
try:
output.append("%s=%d" % (self._keys[key], value))
except TypeError:
output.append("%s=%s" % (self._keys[key], value))
elif key == "expires":
output.append(
"%s=%s"
% (self._keys[key], value.strftime("%a, %d-%b-%Y %T GMT"))
)
elif key in self._flags and self[key]:
output.append(self._keys[key])
else:
output.append("%s=%s" % (self._keys[key], value))
return "; ".join(output)

View File

@@ -0,0 +1,4 @@
from .response import Cookie, CookieJar
__all__ = ("Cookie", "CookieJar")

118
sanic/cookies/request.py Normal file
View File

@@ -0,0 +1,118 @@
import re
from typing import Any, Dict, List, Optional
from sanic.cookies.response import Cookie
from sanic.log import deprecation
from sanic.request.parameters import RequestParameters
COOKIE_NAME_RESERVED_CHARS = re.compile(
'[\x00-\x1F\x7F-\xFF()<>@,;:\\\\"/[\\]?={} \x09]'
)
OCTAL_PATTERN = re.compile(r"\\[0-3][0-7][0-7]")
QUOTE_PATTERN = re.compile(r"[\\].")
def _unquote(str): # no cov
if str is None or len(str) < 2:
return str
if str[0] != '"' or str[-1] != '"':
return str
str = str[1:-1]
i = 0
n = len(str)
res = []
while 0 <= i < n:
o_match = OCTAL_PATTERN.search(str, i)
q_match = QUOTE_PATTERN.search(str, i)
if not o_match and not q_match:
res.append(str[i:])
break
# else:
j = k = -1
if o_match:
j = o_match.start(0)
if q_match:
k = q_match.start(0)
if q_match and (not o_match or k < j):
res.append(str[i:k])
res.append(str[k + 1])
i = k + 2
else:
res.append(str[i:j])
res.append(chr(int(str[j + 1 : j + 4], 8))) # noqa: E203
i = j + 4
return "".join(res)
def parse_cookie(raw: str):
cookies: Dict[str, List] = {}
for token in raw.split(";"):
name, __, value = token.partition("=")
name = name.strip()
value = value.strip()
if not name:
continue
if COOKIE_NAME_RESERVED_CHARS.search(name): # no cov
continue
if len(value) > 2 and value[0] == '"' and value[-1] == '"': # no cov
value = _unquote(value)
if name in cookies:
cookies[name].append(value)
else:
cookies[name] = [value]
return cookies
class CookieRequestParameters(RequestParameters):
def __getitem__(self, key: str) -> Optional[str]:
deprecation(
f"You are accessing cookie key '{key}', which is currently in "
"compat mode returning a single cookie value. Starting in v24.3 "
"accessing a cookie value like this will return a list of values. "
"To avoid this behavior and continue accessing a single value, "
f"please upgrade from request.cookies['{key}'] to "
f"request.cookies.get('{key}'). See more details: ___.",
24.3,
)
try:
value = self._get_prefixed_cookie(key)
except KeyError:
value = super().__getitem__(key)
return value[0]
def __getattr__(self, key: str) -> str:
if key.startswith("_"):
return self.__getattribute__(key)
key = key.rstrip("_").replace("_", "-")
return str(self.get(key, ""))
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
try:
return self._get_prefixed_cookie(name)[0]
except KeyError:
return super().get(name, default)
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
try:
return self._get_prefixed_cookie(name)
except KeyError:
return super().getlist(name, default)
def _get_prefixed_cookie(self, name: str) -> Any:
getitem = super().__getitem__
try:
return getitem(f"{Cookie.HOST_PREFIX}{name}")
except KeyError:
return getitem(f"{Cookie.SECURE_PREFIX}{name}")

608
sanic/cookies/response.py Normal file
View File

@@ -0,0 +1,608 @@
from __future__ import annotations
import re
import string
import sys
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from sanic.exceptions import ServerError
from sanic.log import deprecation
if TYPE_CHECKING:
from sanic.compat import Header
if sys.version_info < (3, 8): # no cov
SameSite = str
else: # no cov
from typing import Literal
SameSite = Union[
Literal["Strict"],
Literal["Lax"],
Literal["None"],
Literal["strict"],
Literal["lax"],
Literal["none"],
]
DEFAULT_MAX_AGE = 0
SAMESITE_VALUES = ("strict", "lax", "none")
LEGAL_CHARS = string.ascii_letters + string.digits + "!#$%&'*+-.^_`|~:"
UNESCAPED_CHARS = LEGAL_CHARS + " ()/<=>?@[]{}"
TRANSLATOR = {ch: f"\\{ch:03o}" for ch in bytes(range(32)) + b'";\\\x7F'}
def _quote(str): # no cov
r"""Quote a string for use in a cookie header.
If the string does not need to be double-quoted, then just return the
string. Otherwise, surround the string in doublequotes and quote
(with a \) special characters.
"""
if str is None or _is_legal_key(str):
return str
else:
return f'"{str.translate(TRANSLATOR)}"'
_is_legal_key = re.compile("[%s]+" % re.escape(LEGAL_CHARS)).fullmatch
# In v24.3, we should remove this as being a subclass of dict
class CookieJar(dict):
"""
CookieJar dynamically writes headers as cookies are added and removed
It gets around the limitation of one header per name by using the
MultiHeader class to provide a unique key that encodes to Set-Cookie.
"""
HEADER_KEY = "Set-Cookie"
def __init__(self, headers: Header):
super().__init__()
self.headers = headers
def __setitem__(self, key, value):
# If this cookie doesn't exist, add it to the header keys
deprecation(
"Setting cookie values using the dict pattern has been "
"deprecated. You should instead use the cookies.add_cookie "
"method. To learn more, please see: ___.",
0,
)
if key not in self:
self.add_cookie(key, value, secure=False, samesite=None)
else:
self[key].value = value
def __delitem__(self, key):
deprecation(
"Deleting cookie values using the dict pattern has been "
"deprecated. You should instead use the cookies.delete_cookie "
"method. To learn more, please see: ___.",
0,
)
if key in self:
super().__delitem__(key)
self.delete_cookie(key)
def __len__(self): # no cov
return len(self.cookies)
def __getitem__(self, key: str) -> Cookie:
deprecation(
"Accessing cookies from the CookieJar by dict key is deprecated. "
"You should instead use the cookies.get_cookie method. "
"To learn more, please see: ___.",
0,
)
return super().__getitem__(key)
def __iter__(self): # no cov
deprecation(
"Iterating over the CookieJar has been deprecated and will be "
"removed in v24.3. To learn more, please see: ___.",
24.3,
)
return super().__iter__()
def keys(self): # no cov
deprecation(
"Accessing CookieJar.keys() has been deprecated and will be "
"removed in v24.3. To learn more, please see: ___.",
24.3,
)
return super().keys()
def values(self): # no cov
deprecation(
"Accessing CookieJar.values() has been deprecated and will be "
"removed in v24.3. To learn more, please see: ___.",
24.3,
)
return super().values()
def items(self): # no cov
deprecation(
"Accessing CookieJar.items() has been deprecated and will be "
"removed in v24.3. To learn more, please see: ___.",
24.3,
)
return super().items()
def get(self, *args, **kwargs): # no cov
deprecation(
"Accessing cookies from the CookieJar using get is deprecated "
"and will be removed in v24.3. You should instead use the "
"cookies.get_cookie method. To learn more, please see: ___.",
24.3,
)
return super().get(*args, **kwargs)
def pop(self, key, *args, **kwargs): # no cov
deprecation(
"Using CookieJar.pop() has been deprecated and will be "
"removed in v24.3. To learn more, please see: ___.",
24.3,
)
self.delete(key)
return super().pop(key, *args, **kwargs)
@property
def header_key(self): # no cov
deprecation(
"The CookieJar.header_key property has been deprecated and will "
"be removed in version 24.3. Use CookieJar.HEADER_KEY. ",
24.3,
)
return CookieJar.HEADER_KEY
@property
def cookie_headers(self) -> Dict[str, str]: # no cov
deprecation(
"The CookieJar.coookie_headers property has been deprecated "
"and will be removed in version 24.3. If you need to check if a "
"particular cookie key has been set, use CookieJar.has_cookie.",
24.3,
)
return {key: self.header_key for key in self}
@property
def cookies(self) -> List[Cookie]:
return self.headers.getall(self.HEADER_KEY)
def get_cookie(
self,
key: str,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Optional[Cookie]:
for cookie in self.cookies:
if (
cookie.key == Cookie.make_key(key, host_prefix, secure_prefix)
and cookie.path == path
and cookie.domain == domain
):
return cookie
return None
def has_cookie(
self,
key: str,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> bool:
for cookie in self.cookies:
if (
cookie.key == Cookie.make_key(key, host_prefix, secure_prefix)
and cookie.path == path
and cookie.domain == domain
):
return True
return False
def add_cookie(
self,
key: str,
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Cookie:
"""
Add a cookie to the CookieJar
:param key: Key of the cookie
:type key: str
:param value: Value of the cookie
:type value: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param secure: Whether to set it as a secure cookie, defaults to True
:type secure: bool
:param max_age: Max age of the cookie in seconds; if set to 0 a
browser should delete it, defaults to None
:type max_age: Optional[int], optional
:param expires: When the cookie expires; if set to None browsers
should set it as a session cookie, defaults to None
:type expires: Optional[datetime], optional
:param httponly: Whether to set it as HTTP only, defaults to False
:type httponly: bool
:param samesite: How to set the samesite property, should be
strict, lax or none (case insensitive), defaults to Lax
:type samesite: Optional[SameSite], optional
:param partitioned: Whether to set it as partitioned, defaults to False
:type partitioned: bool
:param comment: A cookie comment, defaults to None
:type comment: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
:return: The instance of the created cookie
:rtype: Cookie
"""
cookie = Cookie(
key,
value,
path=path,
expires=expires,
comment=comment,
domain=domain,
max_age=max_age,
secure=secure,
httponly=httponly,
samesite=samesite,
partitioned=partitioned,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
self.headers.add(self.HEADER_KEY, cookie)
# This should be removed in v24.3
super().__setitem__(key, cookie)
return cookie
def delete_cookie(
self,
key: str,
*,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> None:
"""
Delete a cookie
This will effectively set it as Max-Age: 0, which a browser should
interpret it to mean: "delete the cookie".
Since it is a browser/client implementation, your results may vary
depending upon which client is being used.
:param key: The key to be deleted
:type key: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
"""
# remove it from header
cookies: List[Cookie] = self.headers.popall(self.HEADER_KEY, [])
for cookie in cookies:
if (
cookie.key != Cookie.make_key(key, host_prefix, secure_prefix)
or cookie.path != path
or cookie.domain != domain
):
self.headers.add(self.HEADER_KEY, cookie)
# This should be removed in v24.3
try:
super().__delitem__(key)
except KeyError:
...
self.add_cookie(
key=key,
value="",
path=path,
domain=domain,
max_age=0,
samesite=None,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
# In v24.3, we should remove this as being a subclass of dict
# Instead, it should be an object with __slots__
# All of the current property accessors should be removed in favor
# of actual slotted properties.
class Cookie(dict):
"""A stripped down version of Morsel from SimpleCookie"""
HOST_PREFIX = "__Host-"
SECURE_PREFIX = "__Secure-"
_keys = {
"path": "Path",
"comment": "Comment",
"domain": "Domain",
"max-age": "Max-Age",
"expires": "expires",
"samesite": "SameSite",
"version": "Version",
"secure": "Secure",
"httponly": "HttpOnly",
"partitioned": "Partitioned",
}
_flags = {"secure", "httponly", "partitioned"}
def __init__(
self,
key: str,
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
):
if key in self._keys:
raise KeyError("Cookie name is a reserved word")
if not _is_legal_key(key):
raise KeyError("Cookie key contains illegal characters")
if host_prefix:
if not secure:
raise ServerError(
"Cannot set host_prefix on a cookie without secure=True"
)
if path != "/":
raise ServerError(
"Cannot set host_prefix on a cookie unless path='/'"
)
if domain:
raise ServerError(
"Cannot set host_prefix on a cookie with a defined domain"
)
elif secure_prefix and not secure:
raise ServerError(
"Cannot set secure_prefix on a cookie without secure=True"
)
if partitioned and not host_prefix:
# This is technically possible, but it is not advisable so we will
# take a stand and say "don't shoot yourself in the foot"
raise ServerError(
"Cannot create a partitioned cookie without "
"also setting host_prefix=True"
)
self.key = self.make_key(key, host_prefix, secure_prefix)
self.value = value
super().__init__()
# This is a temporary solution while this object is a dict. We update
# all of the values in bulk, except for the values that have
# key-specific validation in _set_value
self.update(
{
"path": path,
"comment": comment,
"domain": domain,
"secure": secure,
"httponly": httponly,
"partitioned": partitioned,
"expires": None,
"max-age": None,
"samesite": None,
}
)
if expires is not None:
self._set_value("expires", expires)
if max_age is not None:
self._set_value("max-age", max_age)
if samesite is not None:
self._set_value("samesite", samesite)
def __setitem__(self, key, value):
deprecation(
"Setting values on a Cookie object as a dict has been deprecated. "
"This feature will be removed in v24.3. You should instead set "
f"values on cookies as object properties: cookie.{key}=... ",
24.3,
)
self._set_value(key, value)
# This is a temporary method for backwards compat and should be removed
# in v24.3 when this is no longer a dict
def _set_value(self, key: str, value: Any) -> None:
if key not in self._keys:
raise KeyError("Unknown cookie property: %s=%s" % (key, value))
if value is not None:
if key.lower() == "max-age" and not str(value).isdigit():
raise ValueError("Cookie max-age must be an integer")
elif key.lower() == "expires" and not isinstance(value, datetime):
raise TypeError("Cookie 'expires' property must be a datetime")
elif key.lower() == "samesite":
if value.lower() not in SAMESITE_VALUES:
raise TypeError(
"Cookie 'samesite' property must "
f"be one of: {','.join(SAMESITE_VALUES)}"
)
value = value.title()
super().__setitem__(key, value)
def encode(self, encoding):
"""
Encode the cookie content in a specific type of encoding instructed
by the developer. Leverages the :func:`str.encode` method provided
by python.
This method can be used to encode and embed ``utf-8`` content into
the cookies.
:param encoding: Encoding to be used with the cookie
:return: Cookie encoded in a codec of choosing.
:except: UnicodeEncodeError
"""
deprecation(
"Direct encoding of a Cookie object has been deprecated and will "
"be removed in v24.3.",
24.3,
)
return str(self).encode(encoding)
def __str__(self):
"""Format as a Set-Cookie header value."""
output = ["%s=%s" % (self.key, _quote(self.value))]
key_index = list(self._keys)
for key, value in sorted(
self.items(), key=lambda x: key_index.index(x[0])
):
if value is not None and value is not False:
if key == "max-age":
try:
output.append("%s=%d" % (self._keys[key], value))
except TypeError:
output.append("%s=%s" % (self._keys[key], value))
elif key == "expires":
output.append(
"%s=%s"
% (
self._keys[key],
value.strftime("%a, %d-%b-%Y %T GMT"),
)
)
elif key in self._flags:
output.append(self._keys[key])
else:
output.append("%s=%s" % (self._keys[key], value))
return "; ".join(output)
@property
def path(self) -> str: # no cov
return self["path"]
@path.setter
def path(self, value: str) -> None: # no cov
self._set_value("path", value)
@property
def expires(self) -> Optional[datetime]: # no cov
return self.get("expires")
@expires.setter
def expires(self, value: datetime) -> None: # no cov
self._set_value("expires", value)
@property
def comment(self) -> Optional[str]: # no cov
return self.get("comment")
@comment.setter
def comment(self, value: str) -> None: # no cov
self._set_value("comment", value)
@property
def domain(self) -> Optional[str]: # no cov
return self.get("domain")
@domain.setter
def domain(self, value: str) -> None: # no cov
self._set_value("domain", value)
@property
def max_age(self) -> Optional[int]: # no cov
return self.get("max-age")
@max_age.setter
def max_age(self, value: int) -> None: # no cov
self._set_value("max-age", value)
@property
def secure(self) -> bool: # no cov
return self.get("secure", False)
@secure.setter
def secure(self, value: bool) -> None: # no cov
self._set_value("secure", value)
@property
def httponly(self) -> bool: # no cov
return self.get("httponly", False)
@httponly.setter
def httponly(self, value: bool) -> None: # no cov
self._set_value("httponly", value)
@property
def samesite(self) -> Optional[SameSite]: # no cov
return self.get("samesite")
@samesite.setter
def samesite(self, value: SameSite) -> None: # no cov
self._set_value("samesite", value)
@property
def partitioned(self) -> bool: # no cov
return self.get("partitioned", False)
@partitioned.setter
def partitioned(self, value: bool) -> None: # no cov
self._set_value("partitioned", value)
@classmethod
def make_key(
cls, key: str, host_prefix: bool = False, secure_prefix: bool = False
) -> str:
if host_prefix and secure_prefix:
raise ServerError(
"Both host_prefix and secure_prefix were requested. "
"A cookie should have only one prefix."
)
elif host_prefix:
key = cls.HOST_PREFIX + key
elif secure_prefix:
key = cls.SECURE_PREFIX + key
return key

View File

@@ -126,7 +126,26 @@ logger.addFilter(_verbosity_filter)
def deprecation(message: str, version: float): # no cov
version_info = f"[DEPRECATION v{version}] "
"""
Add a deprecation notice
Example when a feature is being removed. In this case, version
should be AT LEAST next version + 2
deprecation("Helpful message", 99.9)
Example when a feature is deprecated but not being removed:
deprecation("Helpful message", 0)
:param message: The message of the notice
:type message: str
:param version: The version when the feature will be removed. If it is
not being removed, then set version=0.
:type version: float
"""
version_display = f" v{version}" if version else ""
version_info = f"[DEPRECATION{version_display}] "
if is_atty():
version_info = f"{Colors.RED}{version_info}"
message = f"{Colors.YELLOW}{message}{Colors.END}"

11
sanic/request/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
from .form import File, parse_multipart_form
from .parameters import RequestParameters
from .types import Request
__all__ = (
"File",
"parse_multipart_form",
"Request",
"RequestParameters",
)

110
sanic/request/form.py Normal file
View File

@@ -0,0 +1,110 @@
from __future__ import annotations
import email.utils
import unicodedata
from typing import NamedTuple
from urllib.parse import unquote
from sanic.headers import parse_content_header
from sanic.log import logger
from .parameters import RequestParameters
class File(NamedTuple):
"""
Model for defining a file. It is a ``namedtuple``, therefore you can
iterate over the object, or access the parameters by name.
:param type: The mimetype, defaults to text/plain
:param body: Bytes of the file
:param name: The filename
"""
type: str
body: bytes
name: str
def parse_multipart_form(body, boundary):
"""
Parse a request body and returns fields and files
:param body: bytes request body
:param boundary: bytes multipart boundary
:return: fields (RequestParameters), files (RequestParameters)
"""
files = {}
fields = {}
form_parts = body.split(boundary)
for form_part in form_parts[1:-1]:
file_name = None
content_type = "text/plain"
content_charset = "utf-8"
field_name = None
line_index = 2
line_end_index = 0
while not line_end_index == -1:
line_end_index = form_part.find(b"\r\n", line_index)
form_line = form_part[line_index:line_end_index].decode("utf-8")
line_index = line_end_index + 2
if not form_line:
break
colon_index = form_line.index(":")
idx = colon_index + 2
form_header_field = form_line[0:colon_index].lower()
form_header_value, form_parameters = parse_content_header(
form_line[idx:]
)
if form_header_field == "content-disposition":
field_name = form_parameters.get("name")
file_name = form_parameters.get("filename")
# non-ASCII filenames in RFC2231, "filename*" format
if file_name is None and form_parameters.get("filename*"):
encoding, _, value = email.utils.decode_rfc2231(
form_parameters["filename*"]
)
file_name = unquote(value, encoding=encoding)
# Normalize to NFC (Apple MacOS/iOS send NFD)
# Notes:
# - No effect for Windows, Linux or Android clients which
# already send NFC
# - Python open() is tricky (creates files in NFC no matter
# which form you use)
if file_name is not None:
file_name = unicodedata.normalize("NFC", file_name)
elif form_header_field == "content-type":
content_type = form_header_value
content_charset = form_parameters.get("charset", "utf-8")
if field_name:
post_data = form_part[line_index:-4]
if file_name is None:
value = post_data.decode(content_charset)
if field_name in fields:
fields[field_name].append(value)
else:
fields[field_name] = [value]
else:
form_file = File(
type=content_type, name=file_name, body=post_data
)
if field_name in files:
files[field_name].append(form_file)
else:
files[field_name] = [form_file]
else:
logger.debug(
"Form-data field does not have a 'name' parameter "
"in the Content-Disposition header"
)
return RequestParameters(fields), RequestParameters(files)

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
from typing import Any, Optional
class RequestParameters(dict):
"""
Hosts a dict with lists as values where get returns the first
value of the list and getlist returns the whole shebang
"""
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
"""Return the first value, either the default or actual"""
return super().get(name, [default])[0]
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
"""
Return the entire list
"""
return super().get(name, default)

View File

@@ -8,10 +8,10 @@ from typing import (
DefaultDict,
Dict,
List,
NamedTuple,
Optional,
Tuple,
Union,
cast,
)
from sanic_routing.route import Route
@@ -26,14 +26,11 @@ if TYPE_CHECKING:
from sanic.server import ConnInfo
from sanic.app import Sanic
import email.utils
import unicodedata
import uuid
from collections import defaultdict
from http.cookies import SimpleCookie
from types import SimpleNamespace
from urllib.parse import parse_qs, parse_qsl, unquote, urlunparse
from urllib.parse import parse_qs, parse_qsl, urlunparse
from httptools import parse_url
from httptools.parser.errors import HttpParserInvalidURLError
@@ -45,6 +42,7 @@ from sanic.constants import (
IDEMPOTENT_HTTP_METHODS,
SAFE_HTTP_METHODS,
)
from sanic.cookies.request import CookieRequestParameters, parse_cookie
from sanic.exceptions import BadRequest, BadURL, ServerError
from sanic.headers import (
AcceptList,
@@ -57,10 +55,13 @@ from sanic.headers import (
parse_xforwarded,
)
from sanic.http import Stage
from sanic.log import deprecation, error_logger, logger
from sanic.log import deprecation, error_logger
from sanic.models.protocol_types import TransportProtocol
from sanic.response import BaseHTTPResponse, HTTPResponse
from .form import parse_multipart_form
from .parameters import RequestParameters
try:
from ujson import loads as json_loads # type: ignore
@@ -68,25 +69,6 @@ except ImportError:
from json import loads as json_loads # type: ignore
class RequestParameters(dict):
"""
Hosts a dict with lists as values where get returns the first
value of the list and getlist returns the whole shebang
"""
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
"""Return the first value, either the default or actual"""
return super().get(name, [default])[0]
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
"""
Return the entire list
"""
return super().get(name, default)
class Request:
"""
Properties of an HTTP request such as URL, headers, etc.
@@ -120,6 +102,7 @@ class Request:
"method",
"parsed_accept",
"parsed_args",
"parsed_cookies",
"parsed_credentials",
"parsed_files",
"parsed_form",
@@ -166,25 +149,25 @@ class Request:
self.body = b""
self.conn_info: Optional[ConnInfo] = None
self.ctx = SimpleNamespace()
self.parsed_forwarded: Optional[Options] = None
self.parsed_accept: Optional[AcceptList] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_json = None
self.parsed_form: Optional[RequestParameters] = None
self.parsed_files: Optional[RequestParameters] = None
self.parsed_token: Optional[str] = None
self.parsed_args: DefaultDict[
Tuple[bool, bool, str, str], RequestParameters
] = defaultdict(RequestParameters)
self.parsed_cookies: Optional[RequestParameters] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_files: Optional[RequestParameters] = None
self.parsed_form: Optional[RequestParameters] = None
self.parsed_forwarded: Optional[Options] = None
self.parsed_json = None
self.parsed_not_grouped_args: DefaultDict[
Tuple[bool, bool, str, str], List[Tuple[str, str]]
] = defaultdict(list)
self.parsed_token: Optional[str] = None
self._request_middleware_started = False
self._response_middleware_started = False
self.responded: bool = False
self.route: Optional[Route] = None
self.stream: Optional[Stream] = None
self._cookies: Optional[Dict[str, str]] = None
self._match_info: Dict[str, Any] = {}
self._protocol = None
@@ -731,24 +714,21 @@ class Request:
default values.
"""
def get_cookies(self) -> RequestParameters:
cookie = self.headers.getone("cookie", "")
self.parsed_cookies = CookieRequestParameters(parse_cookie(cookie))
return self.parsed_cookies
@property
def cookies(self) -> Dict[str, str]:
def cookies(self) -> RequestParameters:
"""
:return: Incoming cookies on the request
:rtype: Dict[str, str]
"""
if self._cookies is None:
cookie = self.headers.getone("cookie", None)
if cookie is not None:
cookies: SimpleCookie = SimpleCookie()
cookies.load(cookie)
self._cookies = {
name: cookie.value for name, cookie in cookies.items()
}
else:
self._cookies = {}
return self._cookies
if self.parsed_cookies is None:
self.get_cookies()
return cast(CookieRequestParameters, self.parsed_cookies)
@property
def content_type(self) -> str:
@@ -1026,101 +1006,3 @@ class Request:
:rtype: bool
"""
return self.method in CACHEABLE_HTTP_METHODS
class File(NamedTuple):
"""
Model for defining a file. It is a ``namedtuple``, therefore you can
iterate over the object, or access the parameters by name.
:param type: The mimetype, defaults to text/plain
:param body: Bytes of the file
:param name: The filename
"""
type: str
body: bytes
name: str
def parse_multipart_form(body, boundary):
"""
Parse a request body and returns fields and files
:param body: bytes request body
:param boundary: bytes multipart boundary
:return: fields (RequestParameters), files (RequestParameters)
"""
files = RequestParameters()
fields = RequestParameters()
form_parts = body.split(boundary)
for form_part in form_parts[1:-1]:
file_name = None
content_type = "text/plain"
content_charset = "utf-8"
field_name = None
line_index = 2
line_end_index = 0
while not line_end_index == -1:
line_end_index = form_part.find(b"\r\n", line_index)
form_line = form_part[line_index:line_end_index].decode("utf-8")
line_index = line_end_index + 2
if not form_line:
break
colon_index = form_line.index(":")
idx = colon_index + 2
form_header_field = form_line[0:colon_index].lower()
form_header_value, form_parameters = parse_content_header(
form_line[idx:]
)
if form_header_field == "content-disposition":
field_name = form_parameters.get("name")
file_name = form_parameters.get("filename")
# non-ASCII filenames in RFC2231, "filename*" format
if file_name is None and form_parameters.get("filename*"):
encoding, _, value = email.utils.decode_rfc2231(
form_parameters["filename*"]
)
file_name = unquote(value, encoding=encoding)
# Normalize to NFC (Apple MacOS/iOS send NFD)
# Notes:
# - No effect for Windows, Linux or Android clients which
# already send NFC
# - Python open() is tricky (creates files in NFC no matter
# which form you use)
if file_name is not None:
file_name = unicodedata.normalize("NFC", file_name)
elif form_header_field == "content-type":
content_type = form_header_value
content_charset = form_parameters.get("charset", "utf-8")
if field_name:
post_data = form_part[line_index:-4]
if file_name is None:
value = post_data.decode(content_charset)
if field_name in fields:
fields[field_name].append(value)
else:
fields[field_name] = [value]
else:
form_file = File(
type=content_type, name=file_name, body=post_data
)
if field_name in files:
files[field_name].append(form_file)
else:
files[field_name] = [form_file]
else:
logger.debug(
"Form-data field does not have a 'name' parameter "
"in the Content-Disposition header"
)
return fields, files

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from datetime import datetime
from functools import partial
from typing import (
TYPE_CHECKING,
@@ -17,6 +18,7 @@ from typing import (
from sanic.compat import Header
from sanic.cookies import CookieJar
from sanic.cookies.response import Cookie, SameSite
from sanic.exceptions import SanicException, ServerError
from sanic.helpers import (
Default,
@@ -158,6 +160,117 @@ class BaseHTTPResponse:
end_stream=end_stream or False,
)
def add_cookie(
self,
key: str,
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Cookie:
"""
Add a cookie to the CookieJar
:param key: Key of the cookie
:type key: str
:param value: Value of the cookie
:type value: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param secure: Whether to set it as a secure cookie, defaults to True
:type secure: bool
:param max_age: Max age of the cookie in seconds; if set to 0 a
browser should delete it, defaults to None
:type max_age: Optional[int], optional
:param expires: When the cookie expires; if set to None browsers
should set it as a session cookie, defaults to None
:type expires: Optional[datetime], optional
:param httponly: Whether to set it as HTTP only, defaults to False
:type httponly: bool
:param samesite: How to set the samesite property, should be
strict, lax or none (case insensitive), defaults to Lax
:type samesite: Optional[SameSite], optional
:param partitioned: Whether to set it as partitioned, defaults to False
:type partitioned: bool
:param comment: A cookie comment, defaults to None
:type comment: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
:return: The instance of the created cookie
:rtype: Cookie
"""
return self.cookies.add_cookie(
key=key,
value=value,
path=path,
domain=domain,
secure=secure,
max_age=max_age,
expires=expires,
httponly=httponly,
samesite=samesite,
partitioned=partitioned,
comment=comment,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
def delete_cookie(
self,
key: str,
*,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> None:
"""
Delete a cookie
This will effectively set it as Max-Age: 0, which a browser should
interpret it to mean: "delete the cookie".
Since it is a browser/client implementation, your results may vary
depending upon which client is being used.
:param key: The key to be deleted
:type key: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
"""
self.cookies.delete_cookie(
key=key,
path=path,
domain=domain,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
class HTTPResponse(BaseHTTPResponse):
"""
@@ -407,6 +520,8 @@ class ResponseStream:
headers: Optional[Union[Header, Dict[str, str]]] = None,
content_type: Optional[str] = None,
):
if not isinstance(headers, Header):
headers = Header(headers)
self.streaming_fn = streaming_fn
self.status = status
self.headers = headers or Header()