Merge branch 'main' into zhiwei/route-overwrite

This commit is contained in:
Zhiwei 2023-03-29 00:12:04 -04:00 committed by GitHub
commit 4536fd0559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 1774 additions and 1014 deletions

View File

@ -316,8 +316,6 @@ Version 21.3.0
Version 20.12.3 Version 20.12.3
--------------- ---------------
`Current LTS version`
**Bugfixes** **Bugfixes**
* *

View File

@ -1,6 +1,11 @@
📜 Changelog 📜 Changelog
============ ============
| 🔶 Current release
| 🔷 In support release
|
.. mdinclude:: ./releases/23/23.3.md
.. mdinclude:: ./releases/22/22.12.md .. mdinclude:: ./releases/22/22.12.md
.. mdinclude:: ./releases/22/22.9.md .. mdinclude:: ./releases/22/22.9.md
.. mdinclude:: ./releases/22/22.6.md .. mdinclude:: ./releases/22/22.6.md

View File

@ -1,4 +1,4 @@
## Version 22.12.0 🔶 ## Version 22.12.0 🔷
_Current version_ _Current version_

View File

@ -0,0 +1,53 @@
## Version 23.3.0 🔶
### Features
- [#2545](https://github.com/sanic-org/sanic/pull/2545) Standardize init of exceptions for more consistent control of HTTP responses using exceptions
- [#2606](https://github.com/sanic-org/sanic/pull/2606) Decode headers as UTF-8 also in ASGI
- [#2646](https://github.com/sanic-org/sanic/pull/2646) Separate ASGI request and lifespan callables
- [#2659](https://github.com/sanic-org/sanic/pull/2659) Use ``FALLBACK_ERROR_FORMAT`` for handlers that return ``empty()``
- [#2662](https://github.com/sanic-org/sanic/pull/2662) Add basic file browser (HTML page) and auto-index serving
- [#2667](https://github.com/sanic-org/sanic/pull/2667) Nicer traceback formatting (HTML page)
- [#2668](https://github.com/sanic-org/sanic/pull/2668) Smarter error page rendering format selection; more reliant upon header and "common sense" defaults
- [#2680](https://github.com/sanic-org/sanic/pull/2680) Check the status of socket before shutting down with ``SHUT_RDWR``
- [#2687](https://github.com/sanic-org/sanic/pull/2687) Refresh ``Request.accept`` functionality to be more performant and spec-compliant
- [#2696](https://github.com/sanic-org/sanic/pull/2696) Add header accessors as properties
```
Example-Field: Foo, Bar
Example-Field: Baz
```
```python
request.headers.example_field == "Foo, Bar,Baz"
```
- [#2700](https://github.com/sanic-org/sanic/pull/2700) Simpler CLI targets
```sh
$ sanic path.to.module:app # global app instance
$ sanic path.to.module:create_app # factory pattern
$ sanic ./path/to/directory/ # simple serve
```
- [#2701](https://github.com/sanic-org/sanic/pull/2701) API to define a number of workers in managed processes
- [#2704](https://github.com/sanic-org/sanic/pull/2704) Add convenience for dynamic changes to routing
- [#2706](https://github.com/sanic-org/sanic/pull/2706) Add convenience methods for cookie creation and deletion
```python
response = text("...")
response.add_cookie("test", "It worked!", domain=".yummy-yummy-cookie.com")
```
- [#2707](https://github.com/sanic-org/sanic/pull/2707) Simplified ``parse_content_header`` escaping to be RFC-compliant and remove outdated FF hack
- [#2710](https://github.com/sanic-org/sanic/pull/2710) Stricter charset handling and escaping of request URLs
- [#2711](https://github.com/sanic-org/sanic/pull/2711) Consume body on ``DELETE`` by default
- [#2719](https://github.com/sanic-org/sanic/pull/2719) Allow ``password`` to be passed to TLS context
- [#2720](https://github.com/sanic-org/sanic/pull/2720) Skip middleware on ``RequestCancelled``
- [#2721](https://github.com/sanic-org/sanic/pull/2721) Change access logging format to ``%s``
- [#2722](https://github.com/sanic-org/sanic/pull/2722) Add ``CertLoader`` as application option for directly controlling ``SSLContext`` objects
- [#2725](https://github.com/sanic-org/sanic/pull/2725) Worker sync state tolerance on race condition
### Bugfixes
- [#2651](https://github.com/sanic-org/sanic/pull/2651) ASGI websocket to pass thru bytes as is
- [#2697](https://github.com/sanic-org/sanic/pull/2697) Fix comparison between datetime aware and naive in ``file`` when using ``If-Modified-Since``
### Deprecations and Removals
- [#2666](https://github.com/sanic-org/sanic/pull/2666) Remove deprecated ``__blueprintname__`` property
### Improved Documentation
- [#2712](https://github.com/sanic-org/sanic/pull/2712) Improved example using ``'https'`` to create the redirect

View File

@ -1 +1 @@
__version__ = "22.12.0" __version__ = "23.3.0"

View File

@ -64,12 +64,7 @@ from sanic.exceptions import (
from sanic.handlers import ErrorHandler from sanic.handlers import ErrorHandler
from sanic.helpers import Default, _default from sanic.helpers import Default, _default
from sanic.http import Stage from sanic.http import Stage
from sanic.log import ( from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger
LOGGING_CONFIG_DEFAULTS,
deprecation,
error_logger,
logger,
)
from sanic.middleware import Middleware, MiddlewareLocation from sanic.middleware import Middleware, MiddlewareLocation
from sanic.mixins.listeners import ListenerEvent from sanic.mixins.listeners import ListenerEvent
from sanic.mixins.startup import StartupMixin from sanic.mixins.startup import StartupMixin
@ -1587,17 +1582,20 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
self.signalize(self.config.TOUCHUP) self.signalize(self.config.TOUCHUP)
self.finalize() self.finalize()
route_names = [route.name for route in self.router.routes] route_names = [route.extra.ident for route in self.router.routes]
duplicates = { duplicates = {
name for name in route_names if route_names.count(name) > 1 name for name in route_names if route_names.count(name) > 1
} }
if duplicates: if duplicates:
names = ", ".join(duplicates) names = ", ".join(duplicates)
deprecation( message = (
f"Duplicate route names detected: {names}. In the future, " f"Duplicate route names detected: {names}. You should rename "
"Sanic will enforce uniqueness in route naming.", "one or more of them explicitly by using the `name` param, "
23.3, "or changing the implicit name derived from the class and "
"function name. For more details, please see "
"https://sanic.dev/en/guide/release-notes/v23.3.html#duplicated-route-names-are-no-longer-allowed" # noqa
) )
raise ServerError(message)
Sanic._check_uvloop_conflict() Sanic._check_uvloop_conflict()

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import warnings import warnings
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
from urllib.parse import quote
from sanic.compat import Header from sanic.compat import Header
from sanic.exceptions import BadRequest, ServerError from sanic.exceptions import BadRequest, ServerError
@ -146,14 +145,6 @@ class ASGIApp:
raise BadRequest( raise BadRequest(
"Header names can only contain US-ASCII characters" "Header names can only contain US-ASCII characters"
) )
path = (
scope["path"][1:]
if scope["path"].startswith("/")
else scope["path"]
)
url = "/".join([scope.get("root_path", ""), quote(path)])
url_bytes = url.encode("latin-1")
url_bytes += b"?" + scope["query_string"]
if scope["type"] == "http": if scope["type"] == "http":
version = scope["http_version"] version = scope["http_version"]
@ -168,6 +159,13 @@ class ASGIApp:
else: else:
raise ServerError("Received unknown ASGI scope") raise ServerError("Received unknown ASGI scope")
url_bytes, query = scope["raw_path"], scope["query_string"]
if query:
# httpx ASGI client sends query string as part of raw_path
url_bytes = url_bytes.split(b"?", 1)[0]
# All servers send them separately
url_bytes = b"%b?%b" % (url_bytes, query)
request_class = sanic_app.request_class or Request request_class = sanic_app.request_class or Request
instance.request = request_class( instance.request = request_class(
url_bytes, url_bytes,

View File

@ -94,6 +94,7 @@ class Blueprint(BaseSanic):
"_future_exceptions", "_future_exceptions",
"_future_signals", "_future_signals",
"allow_route_overwrite", "allow_route_overwrite",
"copied_from",
"ctx", "ctx",
"exceptions", "exceptions",
"host", "host",
@ -121,6 +122,7 @@ class Blueprint(BaseSanic):
super().__init__(name=name) super().__init__(name=name)
self.reset() self.reset()
self.allow_route_overwrite = allow_route_overwrite self.allow_route_overwrite = allow_route_overwrite
self.copied_from = ""
self.ctx = SimpleNamespace() self.ctx = SimpleNamespace()
self.host = host self.host = host
self.strict_slashes = strict_slashes self.strict_slashes = strict_slashes
@ -218,6 +220,7 @@ class Blueprint(BaseSanic):
self.reset() self.reset()
new_bp = deepcopy(self) new_bp = deepcopy(self)
new_bp.name = name new_bp.name = name
new_bp.copied_from = self.name
if not isinstance(url_prefix, Default): if not isinstance(url_prefix, Default):
new_bp.url_prefix = url_prefix new_bp.url_prefix = url_prefix
@ -361,6 +364,17 @@ class Blueprint(BaseSanic):
route = app._apply_route( route = app._apply_route(
apply_route, overwrite=self.allow_route_overwrite apply_route, overwrite=self.allow_route_overwrite
) )
route = app._apply_route(apply_route)
# If it is a copied BP, then make sure all of the names of routes
# matchup with the new BP name
if self.copied_from:
for r in route:
r.name = r.name.replace(self.copied_from, self.name)
r.extra.ident = r.extra.ident.replace(
self.copied_from, self.name
)
operation = ( operation = (
routes.extend if isinstance(route, list) else routes.append routes.extend if isinstance(route, list) else routes.append
) )

View File

@ -1,4 +1,3 @@
import logging
import os import os
import shutil import shutil
import sys import sys
@ -6,7 +5,7 @@ import sys
from argparse import Namespace from argparse import Namespace
from functools import partial from functools import partial
from textwrap import indent from textwrap import indent
from typing import List, Union, cast from typing import List, Union
from sanic.app import Sanic from sanic.app import Sanic
from sanic.application.logo import get_logo from sanic.application.logo import get_logo
@ -14,7 +13,7 @@ from sanic.cli.arguments import Group
from sanic.cli.base import SanicArgumentParser, SanicHelpFormatter from sanic.cli.base import SanicArgumentParser, SanicHelpFormatter
from sanic.cli.inspector import make_inspector_parser from sanic.cli.inspector import make_inspector_parser
from sanic.cli.inspector_client import InspectorClient from sanic.cli.inspector_client import InspectorClient
from sanic.log import Colors, error_logger from sanic.log import error_logger
from sanic.worker.loader import AppLoader from sanic.worker.loader import AppLoader
@ -24,17 +23,22 @@ class SanicCLI:
{get_logo(True)} {get_logo(True)}
To start running a Sanic application, provide a path to the module, where To start running a Sanic application, provide a path to the module, where
app is a Sanic() instance: app is a Sanic() instance in the global scope:
$ sanic path.to.server:app $ sanic path.to.server:app
If the Sanic instance variable is called 'app', you can leave off the last
part, and only provide a path to the module where the instance is:
$ sanic path.to.server
Or, a path to a callable that returns a Sanic() instance: Or, a path to a callable that returns a Sanic() instance:
$ sanic path.to.factory:create_app --factory $ sanic path.to.factory:create_app
Or, a path to a directory to run as a simple HTTP server: Or, a path to a directory to run as a simple HTTP server:
$ sanic ./path/to/static --simple $ sanic ./path/to/static
""", """,
prefix=" ", prefix=" ",
) )
@ -95,13 +99,9 @@ Or, a path to a directory to run as a simple HTTP server:
self.args = self.parser.parse_args(args=parse_args) self.args = self.parser.parse_args(args=parse_args)
self._precheck() self._precheck()
app_loader = AppLoader( app_loader = AppLoader(
self.args.module, self.args.factory, self.args.simple, self.args self.args.target, self.args.factory, self.args.simple, self.args
) )
if self.args.inspect or self.args.inspect_raw or self.args.trigger:
self._inspector_legacy(app_loader)
return
try: try:
app = self._get_app(app_loader) app = self._get_app(app_loader)
kwargs = self._build_run_kwargs() kwargs = self._build_run_kwargs()
@ -112,38 +112,10 @@ Or, a path to a directory to run as a simple HTTP server:
app.prepare(**kwargs, version=http_version) app.prepare(**kwargs, version=http_version)
if self.args.single: if self.args.single:
serve = Sanic.serve_single serve = Sanic.serve_single
elif self.args.legacy:
serve = Sanic.serve_legacy
else: else:
serve = partial(Sanic.serve, app_loader=app_loader) serve = partial(Sanic.serve, app_loader=app_loader)
serve(app) serve(app)
def _inspector_legacy(self, app_loader: AppLoader):
host = port = None
module = cast(str, self.args.module)
if ":" in module:
maybe_host, maybe_port = module.rsplit(":", 1)
if maybe_port.isnumeric():
host, port = maybe_host, int(maybe_port)
if not host:
app = self._get_app(app_loader)
host, port = app.config.INSPECTOR_HOST, app.config.INSPECTOR_PORT
action = self.args.trigger or "info"
InspectorClient(
str(host), int(port or 6457), False, self.args.inspect_raw, ""
).do(action)
sys.stdout.write(
f"\n{Colors.BOLD}{Colors.YELLOW}WARNING:{Colors.END} "
"You are using the legacy CLI command that will be removed in "
f"{Colors.RED}v23.3{Colors.END}. See "
"https://sanic.dev/en/guide/release-notes/v22.12.html"
"#deprecations-and-removals or checkout the new "
"style commands:\n\n\t"
f"{Colors.YELLOW}sanic inspect --help{Colors.END}\n"
)
def _inspector(self): def _inspector(self):
args = sys.argv[2:] args = sys.argv[2:]
self.args, unknown = self.parser.parse_known_args(args=args) self.args, unknown = self.parser.parse_known_args(args=args)
@ -197,8 +169,6 @@ Or, a path to a directory to run as a simple HTTP server:
) )
error_logger.error(message) error_logger.error(message)
sys.exit(1) sys.exit(1)
if self.args.inspect or self.args.inspect_raw:
logging.disable(logging.CRITICAL)
def _get_app(self, app_loader: AppLoader): def _get_app(self, app_loader: AppLoader):
try: try:
@ -246,7 +216,6 @@ Or, a path to a directory to run as a simple HTTP server:
"workers": self.args.workers, "workers": self.args.workers,
"auto_tls": self.args.auto_tls, "auto_tls": self.args.auto_tls,
"single_process": self.args.single, "single_process": self.args.single,
"legacy": self.args.legacy,
} }
for maybe_arg in ("auto_reload", "dev"): for maybe_arg in ("auto_reload", "dev"):

View File

@ -57,11 +57,15 @@ class GeneralGroup(Group):
) )
self.container.add_argument( self.container.add_argument(
"module", "target",
help=( help=(
"Path to your Sanic app. Example: path.to.server:app\n" "Path to your Sanic app instance.\n"
"If running a Simple Server, path to directory to serve. " "\tExample: path.to.server:app\n"
"Example: ./\n" "If running a Simple Server, path to directory to serve.\n"
"\tExample: ./\n"
"Additionally, this can be a path to a factory function\n"
"that returns a Sanic app instance.\n"
"\tExample: path.to.server:create_app\n"
), ),
) )
@ -89,32 +93,6 @@ class ApplicationGroup(Group):
"a directory\n(module arg should be a path)" "a directory\n(module arg should be a path)"
), ),
) )
group.add_argument(
"--inspect",
dest="inspect",
action="store_true",
help=("Inspect the state of a running instance, human readable"),
)
group.add_argument(
"--inspect-raw",
dest="inspect_raw",
action="store_true",
help=("Inspect the state of a running instance, JSON output"),
)
group.add_argument(
"--trigger-reload",
dest="trigger",
action="store_const",
const="reload",
help=("Trigger worker processes to reload"),
)
group.add_argument(
"--trigger-shutdown",
dest="trigger",
action="store_const",
const="shutdown",
help=("Trigger all processes to shutdown"),
)
class HTTPVersionGroup(Group): class HTTPVersionGroup(Group):
@ -243,11 +221,6 @@ class WorkerGroup(Group):
action="store_true", action="store_true",
help="Do not use multiprocessing, run server in a single process", help="Do not use multiprocessing, run server in a single process",
) )
self.container.add_argument(
"--legacy",
action="store_true",
help="Use the legacy server manager",
)
self.add_bool_arguments( self.add_bool_arguments(
"--access-logs", "--access-logs",
dest="access_log", dest="access_log",

View File

@ -1,156 +0,0 @@
import re
import string
from datetime import datetime
from typing import Dict
DEFAULT_MAX_AGE = 0
# ------------------------------------------------------------ #
# SimpleCookie
# ------------------------------------------------------------ #
# Straight up copied this section of dark magic from SimpleCookie
_LegalChars = string.ascii_letters + string.digits + "!#$%&'*+-.^_`|~:"
_UnescapedChars = _LegalChars + " ()/<=>?@[]{}"
_Translator = {
n: "\\%03o" % n for n in set(range(256)) - set(map(ord, _UnescapedChars))
}
_Translator.update({ord('"'): '\\"', ord("\\"): "\\\\"})
def _quote(str):
r"""Quote a string for use in a cookie header.
If the string does not need to be double-quoted, then just return the
string. Otherwise, surround the string in doublequotes and quote
(with a \) special characters.
"""
if str is None or _is_legal_key(str):
return str
else:
return '"' + str.translate(_Translator) + '"'
_is_legal_key = re.compile("[%s]+" % re.escape(_LegalChars)).fullmatch
# ------------------------------------------------------------ #
# Custom SimpleCookie
# ------------------------------------------------------------ #
class CookieJar(dict):
"""
CookieJar dynamically writes headers as cookies are added and removed
It gets around the limitation of one header per name by using the
MultiHeader class to provide a unique key that encodes to Set-Cookie.
"""
def __init__(self, headers):
super().__init__()
self.headers: Dict[str, str] = headers
self.cookie_headers: Dict[str, str] = {}
self.header_key: str = "Set-Cookie"
def __setitem__(self, key, value):
# If this cookie doesn't exist, add it to the header keys
if not self.cookie_headers.get(key):
cookie = Cookie(key, value)
cookie["path"] = "/"
self.cookie_headers[key] = self.header_key
self.headers.add(self.header_key, cookie)
return super().__setitem__(key, cookie)
else:
self[key].value = value
def __delitem__(self, key):
if key not in self.cookie_headers:
self[key] = ""
self[key]["max-age"] = 0
else:
cookie_header = self.cookie_headers[key]
# remove it from header
cookies = self.headers.popall(cookie_header)
for cookie in cookies:
if cookie.key != key:
self.headers.add(cookie_header, cookie)
del self.cookie_headers[key]
return super().__delitem__(key)
class Cookie(dict):
"""A stripped down version of Morsel from SimpleCookie #gottagofast"""
_keys = {
"expires": "expires",
"path": "Path",
"comment": "Comment",
"domain": "Domain",
"max-age": "Max-Age",
"secure": "Secure",
"httponly": "HttpOnly",
"version": "Version",
"samesite": "SameSite",
}
_flags = {"secure", "httponly"}
def __init__(self, key, value):
if key in self._keys:
raise KeyError("Cookie name is a reserved word")
if not _is_legal_key(key):
raise KeyError("Cookie key contains illegal characters")
self.key = key
self.value = value
super().__init__()
def __setitem__(self, key, value):
if key not in self._keys:
raise KeyError("Unknown cookie property")
if value is not False:
if key.lower() == "max-age":
if not str(value).isdigit():
raise ValueError("Cookie max-age must be an integer")
elif key.lower() == "expires":
if not isinstance(value, datetime):
raise TypeError(
"Cookie 'expires' property must be a datetime"
)
return super().__setitem__(key, value)
def encode(self, encoding):
"""
Encode the cookie content in a specific type of encoding instructed
by the developer. Leverages the :func:`str.encode` method provided
by python.
This method can be used to encode and embed ``utf-8`` content into
the cookies.
:param encoding: Encoding to be used with the cookie
:return: Cookie encoded in a codec of choosing.
:except: UnicodeEncodeError
"""
return str(self).encode(encoding)
def __str__(self):
"""Format as a Set-Cookie header value."""
output = ["%s=%s" % (self.key, _quote(self.value))]
for key, value in self.items():
if key == "max-age":
try:
output.append("%s=%d" % (self._keys[key], value))
except TypeError:
output.append("%s=%s" % (self._keys[key], value))
elif key == "expires":
output.append(
"%s=%s"
% (self._keys[key], value.strftime("%a, %d-%b-%Y %T GMT"))
)
elif key in self._flags and self[key]:
output.append(self._keys[key])
else:
output.append("%s=%s" % (self._keys[key], value))
return "; ".join(output)

View File

@ -0,0 +1,4 @@
from .response import Cookie, CookieJar
__all__ = ("Cookie", "CookieJar")

119
sanic/cookies/request.py Normal file
View File

@ -0,0 +1,119 @@
import re
from typing import Any, Dict, List, Optional
from sanic.cookies.response import Cookie
from sanic.log import deprecation
from sanic.request.parameters import RequestParameters
COOKIE_NAME_RESERVED_CHARS = re.compile(
'[\x00-\x1F\x7F-\xFF()<>@,;:\\\\"/[\\]?={} \x09]'
)
OCTAL_PATTERN = re.compile(r"\\[0-3][0-7][0-7]")
QUOTE_PATTERN = re.compile(r"[\\].")
def _unquote(str): # no cov
if str is None or len(str) < 2:
return str
if str[0] != '"' or str[-1] != '"':
return str
str = str[1:-1]
i = 0
n = len(str)
res = []
while 0 <= i < n:
o_match = OCTAL_PATTERN.search(str, i)
q_match = QUOTE_PATTERN.search(str, i)
if not o_match and not q_match:
res.append(str[i:])
break
# else:
j = k = -1
if o_match:
j = o_match.start(0)
if q_match:
k = q_match.start(0)
if q_match and (not o_match or k < j):
res.append(str[i:k])
res.append(str[k + 1])
i = k + 2
else:
res.append(str[i:j])
res.append(chr(int(str[j + 1 : j + 4], 8))) # noqa: E203
i = j + 4
return "".join(res)
def parse_cookie(raw: str):
cookies: Dict[str, List] = {}
for token in raw.split(";"):
name, __, value = token.partition("=")
name = name.strip()
value = value.strip()
if not name:
continue
if COOKIE_NAME_RESERVED_CHARS.search(name): # no cov
continue
if len(value) > 2 and value[0] == '"' and value[-1] == '"': # no cov
value = _unquote(value)
if name in cookies:
cookies[name].append(value)
else:
cookies[name] = [value]
return cookies
class CookieRequestParameters(RequestParameters):
def __getitem__(self, key: str) -> Optional[str]:
deprecation(
f"You are accessing cookie key '{key}', which is currently in "
"compat mode returning a single cookie value. Starting in v24.3 "
"accessing a cookie value like this will return a list of values. "
"To avoid this behavior and continue accessing a single value, "
f"please upgrade from request.cookies['{key}'] to "
f"request.cookies.get('{key}'). See more details: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#request-cookies", # noqa
24.3,
)
try:
value = self._get_prefixed_cookie(key)
except KeyError:
value = super().__getitem__(key)
return value[0]
def __getattr__(self, key: str) -> str:
if key.startswith("_"):
return self.__getattribute__(key)
key = key.rstrip("_").replace("_", "-")
return str(self.get(key, ""))
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
try:
return self._get_prefixed_cookie(name)[0]
except KeyError:
return super().get(name, default)
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
try:
return self._get_prefixed_cookie(name)
except KeyError:
return super().getlist(name, default)
def _get_prefixed_cookie(self, name: str) -> Any:
getitem = super().__getitem__
try:
return getitem(f"{Cookie.HOST_PREFIX}{name}")
except KeyError:
return getitem(f"{Cookie.SECURE_PREFIX}{name}")

617
sanic/cookies/response.py Normal file
View File

@ -0,0 +1,617 @@
from __future__ import annotations
import re
import string
import sys
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from sanic.exceptions import ServerError
from sanic.log import deprecation
if TYPE_CHECKING:
from sanic.compat import Header
if sys.version_info < (3, 8): # no cov
SameSite = str
else: # no cov
from typing import Literal
SameSite = Union[
Literal["Strict"],
Literal["Lax"],
Literal["None"],
Literal["strict"],
Literal["lax"],
Literal["none"],
]
DEFAULT_MAX_AGE = 0
SAMESITE_VALUES = ("strict", "lax", "none")
LEGAL_CHARS = string.ascii_letters + string.digits + "!#$%&'*+-.^_`|~:"
UNESCAPED_CHARS = LEGAL_CHARS + " ()/<=>?@[]{}"
TRANSLATOR = {ch: f"\\{ch:03o}" for ch in bytes(range(32)) + b'";\\\x7F'}
def _quote(str): # no cov
r"""Quote a string for use in a cookie header.
If the string does not need to be double-quoted, then just return the
string. Otherwise, surround the string in doublequotes and quote
(with a \) special characters.
"""
if str is None or _is_legal_key(str):
return str
else:
return f'"{str.translate(TRANSLATOR)}"'
_is_legal_key = re.compile("[%s]+" % re.escape(LEGAL_CHARS)).fullmatch
# In v24.3, we should remove this as being a subclass of dict
class CookieJar(dict):
"""
CookieJar dynamically writes headers as cookies are added and removed
It gets around the limitation of one header per name by using the
MultiHeader class to provide a unique key that encodes to Set-Cookie.
"""
HEADER_KEY = "Set-Cookie"
def __init__(self, headers: Header):
super().__init__()
self.headers = headers
def __setitem__(self, key, value):
# If this cookie doesn't exist, add it to the header keys
deprecation(
"Setting cookie values using the dict pattern has been "
"deprecated. You should instead use the cookies.add_cookie "
"method. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
0,
)
if key not in self:
self.add_cookie(key, value, secure=False, samesite=None)
else:
self[key].value = value
def __delitem__(self, key):
deprecation(
"Deleting cookie values using the dict pattern has been "
"deprecated. You should instead use the cookies.delete_cookie "
"method. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
0,
)
if key in self:
super().__delitem__(key)
self.delete_cookie(key)
def __len__(self): # no cov
return len(self.cookies)
def __getitem__(self, key: str) -> Cookie:
deprecation(
"Accessing cookies from the CookieJar by dict key is deprecated. "
"You should instead use the cookies.get_cookie method. "
"To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
0,
)
return super().__getitem__(key)
def __iter__(self): # no cov
deprecation(
"Iterating over the CookieJar has been deprecated and will be "
"removed in v24.3. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
24.3,
)
return super().__iter__()
def keys(self): # no cov
deprecation(
"Accessing CookieJar.keys() has been deprecated and will be "
"removed in v24.3. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
24.3,
)
return super().keys()
def values(self): # no cov
deprecation(
"Accessing CookieJar.values() has been deprecated and will be "
"removed in v24.3. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
24.3,
)
return super().values()
def items(self): # no cov
deprecation(
"Accessing CookieJar.items() has been deprecated and will be "
"removed in v24.3. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
24.3,
)
return super().items()
def get(self, *args, **kwargs): # no cov
deprecation(
"Accessing cookies from the CookieJar using get is deprecated "
"and will be removed in v24.3. You should instead use the "
"cookies.get_cookie method. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
24.3,
)
return super().get(*args, **kwargs)
def pop(self, key, *args, **kwargs): # no cov
deprecation(
"Using CookieJar.pop() has been deprecated and will be "
"removed in v24.3. To learn more, please see: "
"https://sanic.dev/en/guide/release-notes/v23.3.html#response-cookies", # noqa
24.3,
)
self.delete(key)
return super().pop(key, *args, **kwargs)
@property
def header_key(self): # no cov
deprecation(
"The CookieJar.header_key property has been deprecated and will "
"be removed in version 24.3. Use CookieJar.HEADER_KEY. ",
24.3,
)
return CookieJar.HEADER_KEY
@property
def cookie_headers(self) -> Dict[str, str]: # no cov
deprecation(
"The CookieJar.coookie_headers property has been deprecated "
"and will be removed in version 24.3. If you need to check if a "
"particular cookie key has been set, use CookieJar.has_cookie.",
24.3,
)
return {key: self.header_key for key in self}
@property
def cookies(self) -> List[Cookie]:
return self.headers.getall(self.HEADER_KEY)
def get_cookie(
self,
key: str,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Optional[Cookie]:
for cookie in self.cookies:
if (
cookie.key == Cookie.make_key(key, host_prefix, secure_prefix)
and cookie.path == path
and cookie.domain == domain
):
return cookie
return None
def has_cookie(
self,
key: str,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> bool:
for cookie in self.cookies:
if (
cookie.key == Cookie.make_key(key, host_prefix, secure_prefix)
and cookie.path == path
and cookie.domain == domain
):
return True
return False
def add_cookie(
self,
key: str,
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Cookie:
"""
Add a cookie to the CookieJar
:param key: Key of the cookie
:type key: str
:param value: Value of the cookie
:type value: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param secure: Whether to set it as a secure cookie, defaults to True
:type secure: bool
:param max_age: Max age of the cookie in seconds; if set to 0 a
browser should delete it, defaults to None
:type max_age: Optional[int], optional
:param expires: When the cookie expires; if set to None browsers
should set it as a session cookie, defaults to None
:type expires: Optional[datetime], optional
:param httponly: Whether to set it as HTTP only, defaults to False
:type httponly: bool
:param samesite: How to set the samesite property, should be
strict, lax or none (case insensitive), defaults to Lax
:type samesite: Optional[SameSite], optional
:param partitioned: Whether to set it as partitioned, defaults to False
:type partitioned: bool
:param comment: A cookie comment, defaults to None
:type comment: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
:return: The instance of the created cookie
:rtype: Cookie
"""
cookie = Cookie(
key,
value,
path=path,
expires=expires,
comment=comment,
domain=domain,
max_age=max_age,
secure=secure,
httponly=httponly,
samesite=samesite,
partitioned=partitioned,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
self.headers.add(self.HEADER_KEY, cookie)
# This should be removed in v24.3
super().__setitem__(key, cookie)
return cookie
def delete_cookie(
self,
key: str,
*,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> None:
"""
Delete a cookie
This will effectively set it as Max-Age: 0, which a browser should
interpret it to mean: "delete the cookie".
Since it is a browser/client implementation, your results may vary
depending upon which client is being used.
:param key: The key to be deleted
:type key: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
"""
# remove it from header
cookies: List[Cookie] = self.headers.popall(self.HEADER_KEY, [])
for cookie in cookies:
if (
cookie.key != Cookie.make_key(key, host_prefix, secure_prefix)
or cookie.path != path
or cookie.domain != domain
):
self.headers.add(self.HEADER_KEY, cookie)
# This should be removed in v24.3
try:
super().__delitem__(key)
except KeyError:
...
self.add_cookie(
key=key,
value="",
path=path,
domain=domain,
max_age=0,
samesite=None,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
# In v24.3, we should remove this as being a subclass of dict
# Instead, it should be an object with __slots__
# All of the current property accessors should be removed in favor
# of actual slotted properties.
class Cookie(dict):
"""A stripped down version of Morsel from SimpleCookie"""
HOST_PREFIX = "__Host-"
SECURE_PREFIX = "__Secure-"
_keys = {
"path": "Path",
"comment": "Comment",
"domain": "Domain",
"max-age": "Max-Age",
"expires": "expires",
"samesite": "SameSite",
"version": "Version",
"secure": "Secure",
"httponly": "HttpOnly",
"partitioned": "Partitioned",
}
_flags = {"secure", "httponly", "partitioned"}
def __init__(
self,
key: str,
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
):
if key in self._keys:
raise KeyError("Cookie name is a reserved word")
if not _is_legal_key(key):
raise KeyError("Cookie key contains illegal characters")
if host_prefix:
if not secure:
raise ServerError(
"Cannot set host_prefix on a cookie without secure=True"
)
if path != "/":
raise ServerError(
"Cannot set host_prefix on a cookie unless path='/'"
)
if domain:
raise ServerError(
"Cannot set host_prefix on a cookie with a defined domain"
)
elif secure_prefix and not secure:
raise ServerError(
"Cannot set secure_prefix on a cookie without secure=True"
)
if partitioned and not host_prefix:
# This is technically possible, but it is not advisable so we will
# take a stand and say "don't shoot yourself in the foot"
raise ServerError(
"Cannot create a partitioned cookie without "
"also setting host_prefix=True"
)
self.key = self.make_key(key, host_prefix, secure_prefix)
self.value = value
super().__init__()
# This is a temporary solution while this object is a dict. We update
# all of the values in bulk, except for the values that have
# key-specific validation in _set_value
self.update(
{
"path": path,
"comment": comment,
"domain": domain,
"secure": secure,
"httponly": httponly,
"partitioned": partitioned,
"expires": None,
"max-age": None,
"samesite": None,
}
)
if expires is not None:
self._set_value("expires", expires)
if max_age is not None:
self._set_value("max-age", max_age)
if samesite is not None:
self._set_value("samesite", samesite)
def __setitem__(self, key, value):
deprecation(
"Setting values on a Cookie object as a dict has been deprecated. "
"This feature will be removed in v24.3. You should instead set "
f"values on cookies as object properties: cookie.{key}=... ",
24.3,
)
self._set_value(key, value)
# This is a temporary method for backwards compat and should be removed
# in v24.3 when this is no longer a dict
def _set_value(self, key: str, value: Any) -> None:
if key not in self._keys:
raise KeyError("Unknown cookie property: %s=%s" % (key, value))
if value is not None:
if key.lower() == "max-age" and not str(value).isdigit():
raise ValueError("Cookie max-age must be an integer")
elif key.lower() == "expires" and not isinstance(value, datetime):
raise TypeError("Cookie 'expires' property must be a datetime")
elif key.lower() == "samesite":
if value.lower() not in SAMESITE_VALUES:
raise TypeError(
"Cookie 'samesite' property must "
f"be one of: {','.join(SAMESITE_VALUES)}"
)
value = value.title()
super().__setitem__(key, value)
def encode(self, encoding):
"""
Encode the cookie content in a specific type of encoding instructed
by the developer. Leverages the :func:`str.encode` method provided
by python.
This method can be used to encode and embed ``utf-8`` content into
the cookies.
:param encoding: Encoding to be used with the cookie
:return: Cookie encoded in a codec of choosing.
:except: UnicodeEncodeError
"""
deprecation(
"Direct encoding of a Cookie object has been deprecated and will "
"be removed in v24.3.",
24.3,
)
return str(self).encode(encoding)
def __str__(self):
"""Format as a Set-Cookie header value."""
output = ["%s=%s" % (self.key, _quote(self.value))]
key_index = list(self._keys)
for key, value in sorted(
self.items(), key=lambda x: key_index.index(x[0])
):
if value is not None and value is not False:
if key == "max-age":
try:
output.append("%s=%d" % (self._keys[key], value))
except TypeError:
output.append("%s=%s" % (self._keys[key], value))
elif key == "expires":
output.append(
"%s=%s"
% (
self._keys[key],
value.strftime("%a, %d-%b-%Y %T GMT"),
)
)
elif key in self._flags:
output.append(self._keys[key])
else:
output.append("%s=%s" % (self._keys[key], value))
return "; ".join(output)
@property
def path(self) -> str: # no cov
return self["path"]
@path.setter
def path(self, value: str) -> None: # no cov
self._set_value("path", value)
@property
def expires(self) -> Optional[datetime]: # no cov
return self.get("expires")
@expires.setter
def expires(self, value: datetime) -> None: # no cov
self._set_value("expires", value)
@property
def comment(self) -> Optional[str]: # no cov
return self.get("comment")
@comment.setter
def comment(self, value: str) -> None: # no cov
self._set_value("comment", value)
@property
def domain(self) -> Optional[str]: # no cov
return self.get("domain")
@domain.setter
def domain(self, value: str) -> None: # no cov
self._set_value("domain", value)
@property
def max_age(self) -> Optional[int]: # no cov
return self.get("max-age")
@max_age.setter
def max_age(self, value: int) -> None: # no cov
self._set_value("max-age", value)
@property
def secure(self) -> bool: # no cov
return self.get("secure", False)
@secure.setter
def secure(self, value: bool) -> None: # no cov
self._set_value("secure", value)
@property
def httponly(self) -> bool: # no cov
return self.get("httponly", False)
@httponly.setter
def httponly(self, value: bool) -> None: # no cov
self._set_value("httponly", value)
@property
def samesite(self) -> Optional[SameSite]: # no cov
return self.get("samesite")
@samesite.setter
def samesite(self, value: SameSite) -> None: # no cov
self._set_value("samesite", value)
@property
def partitioned(self) -> bool: # no cov
return self.get("partitioned", False)
@partitioned.setter
def partitioned(self, value: bool) -> None: # no cov
self._set_value("partitioned", value)
@classmethod
def make_key(
cls, key: str, host_prefix: bool = False, secure_prefix: bool = False
) -> str:
if host_prefix and secure_prefix:
raise ServerError(
"Both host_prefix and secure_prefix were requested. "
"A cookie should have only one prefix."
)
elif host_prefix:
key = cls.HOST_PREFIX + key
elif secure_prefix:
key = cls.SECURE_PREFIX + key
return key

View File

@ -3,7 +3,8 @@ from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Type from typing import Dict, List, Optional, Tuple, Type
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
from sanic.log import deprecation, error_logger from sanic.exceptions import ServerError
from sanic.log import error_logger
from sanic.models.handler_types import RouteHandler from sanic.models.handler_types import RouteHandler
from sanic.response import text from sanic.response import text
@ -43,16 +44,11 @@ class ErrorHandler:
if name is None: if name is None:
name = "__ALL_ROUTES__" name = "__ALL_ROUTES__"
error_logger.warning( message = (
f"Duplicate exception handler definition on: route={name} " f"Duplicate exception handler definition on: route={name} "
f"and exception={exc}" f"and exception={exc}"
) )
deprecation( raise ServerError(message)
"A duplicate exception handler definition was discovered. "
"This may cause unintended consequences. A warning has been "
"issued now, but it will not be allowed starting in v23.3.",
23.3,
)
self.cached_handlers[key] = handler self.cached_handlers[key] = handler
def add(self, exception, handler, route_names: Optional[List[str]] = None): def add(self, exception, handler, route_names: Optional[List[str]] = None):

View File

@ -240,9 +240,14 @@ class Http(Stream, metaclass=TouchUpMeta):
headers_instance.getone("upgrade", "").lower() == "websocket" headers_instance.getone("upgrade", "").lower() == "websocket"
) )
try:
url_bytes = self.url.encode("ASCII")
except UnicodeEncodeError:
raise BadRequest("URL may only contain US-ASCII characters.")
# Prepare a Request object # Prepare a Request object
request = self.protocol.request_class( request = self.protocol.request_class(
url_bytes=self.url.encode(), url_bytes=url_bytes,
headers=headers_instance, headers=headers_instance,
head=bytes(head), head=bytes(head),
version=protocol[5:], version=protocol[5:],
@ -445,9 +450,18 @@ class Http(Stream, metaclass=TouchUpMeta):
bogus response for error handling use. bogus response for error handling use.
""" """
# Reformat any URL already received with \xHH escapes for better logs
url_bytes = (
self.url.encode(errors="surrogateescape")
.decode("ASCII", errors="backslashreplace")
.encode("ASCII")
if self.url
else b"*"
)
# FIXME: Avoid this by refactoring error handling and response code # FIXME: Avoid this by refactoring error handling and response code
self.request = self.protocol.request_class( self.request = self.protocol.request_class(
url_bytes=self.url.encode() if self.url else b"*", url_bytes=url_bytes,
headers=Header({}), headers=Header({}),
version="1.1", version="1.1",
method="NONE", method="NONE",

View File

@ -18,7 +18,12 @@ from typing import (
from sanic.compat import Header from sanic.compat import Header
from sanic.constants import LocalCertCreator from sanic.constants import LocalCertCreator
from sanic.exceptions import PayloadTooLarge, SanicException, ServerError from sanic.exceptions import (
BadRequest,
PayloadTooLarge,
SanicException,
ServerError,
)
from sanic.helpers import has_message_body from sanic.helpers import has_message_body
from sanic.http.constants import Stage from sanic.http.constants import Stage
from sanic.http.stream import Stream from sanic.http.stream import Stream
@ -333,7 +338,17 @@ class Http3:
return self.receivers[stream_id] return self.receivers[stream_id]
def _make_request(self, event: HeadersReceived) -> Request: def _make_request(self, event: HeadersReceived) -> Request:
headers = Header(((k.decode(), v.decode()) for k, v in event.headers)) try:
headers = Header(
(
(k.decode("ASCII"), v.decode(errors="surrogateescape"))
for k, v in event.headers
)
)
except UnicodeDecodeError:
raise BadRequest(
"Header names may only contain US-ASCII characters."
)
method = headers[":method"] method = headers[":method"]
path = headers[":path"] path = headers[":path"]
scheme = headers.pop(":scheme", "") scheme = headers.pop(":scheme", "")
@ -342,9 +357,14 @@ class Http3:
if authority: if authority:
headers["host"] = authority headers["host"] = authority
try:
url_bytes = path.encode("ASCII")
except UnicodeEncodeError:
raise BadRequest("URL may only contain US-ASCII characters.")
transport = HTTP3Transport(self.protocol) transport = HTTP3Transport(self.protocol)
request = self.protocol.request_class( request = self.protocol.request_class(
path.encode(), url_bytes,
headers, headers,
"3", "3",
method, method,

View File

@ -126,7 +126,26 @@ logger.addFilter(_verbosity_filter)
def deprecation(message: str, version: float): # no cov def deprecation(message: str, version: float): # no cov
version_info = f"[DEPRECATION v{version}] " """
Add a deprecation notice
Example when a feature is being removed. In this case, version
should be AT LEAST next version + 2
deprecation("Helpful message", 99.9)
Example when a feature is deprecated but not being removed:
deprecation("Helpful message", 0)
:param message: The message of the notice
:type message: str
:param version: The version when the feature will be removed. If it is
not being removed, then set version=0.
:type version: float
"""
version_display = f" v{version}" if version else ""
version_info = f"[DEPRECATION{version_display}] "
if is_atty(): if is_atty():
version_info = f"{Colors.RED}{version_info}" version_info = f"{Colors.RED}{version_info}"
message = f"{Colors.YELLOW}{message}{Colors.END}" message = f"{Colors.YELLOW}{message}{Colors.END}"

View File

@ -47,17 +47,16 @@ from sanic.helpers import Default, _default
from sanic.http.constants import HTTP from sanic.http.constants import HTTP
from sanic.http.tls import get_ssl_context, process_to_context from sanic.http.tls import get_ssl_context, process_to_context
from sanic.http.tls.context import SanicSSLContext from sanic.http.tls.context import SanicSSLContext
from sanic.log import Colors, deprecation, error_logger, logger from sanic.log import Colors, error_logger, logger
from sanic.models.handler_types import ListenerType from sanic.models.handler_types import ListenerType
from sanic.server import Signal as ServerSignal from sanic.server import Signal as ServerSignal
from sanic.server import try_use_uvloop from sanic.server import try_use_uvloop
from sanic.server.async_server import AsyncioServer from sanic.server.async_server import AsyncioServer
from sanic.server.events import trigger_events from sanic.server.events import trigger_events
from sanic.server.legacy import watchdog
from sanic.server.loop import try_windows_loop from sanic.server.loop import try_windows_loop
from sanic.server.protocols.http_protocol import HttpProtocol from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.protocols.websocket_protocol import WebSocketProtocol from sanic.server.protocols.websocket_protocol import WebSocketProtocol
from sanic.server.runners import serve, serve_multiple, serve_single from sanic.server.runners import serve
from sanic.server.socket import configure_socket, remove_unix_socket from sanic.server.socket import configure_socket, remove_unix_socket
from sanic.worker.loader import AppLoader from sanic.worker.loader import AppLoader
from sanic.worker.manager import WorkerManager from sanic.worker.manager import WorkerManager
@ -135,7 +134,6 @@ class StartupMixin(metaclass=SanicMeta):
motd_display: Optional[Dict[str, str]] = None, motd_display: Optional[Dict[str, str]] = None,
auto_tls: bool = False, auto_tls: bool = False,
single_process: bool = False, single_process: bool = False,
legacy: bool = False,
) -> None: ) -> None:
""" """
Run the HTTP Server and listen until keyboard interrupt or term Run the HTTP Server and listen until keyboard interrupt or term
@ -197,13 +195,10 @@ class StartupMixin(metaclass=SanicMeta):
motd_display=motd_display, motd_display=motd_display,
auto_tls=auto_tls, auto_tls=auto_tls,
single_process=single_process, single_process=single_process,
legacy=legacy,
) )
if single_process: if single_process:
serve = self.__class__.serve_single serve = self.__class__.serve_single
elif legacy:
serve = self.__class__.serve_legacy
else: else:
serve = self.__class__.serve serve = self.__class__.serve
serve(primary=self) # type: ignore serve(primary=self) # type: ignore
@ -235,7 +230,6 @@ class StartupMixin(metaclass=SanicMeta):
coffee: bool = False, coffee: bool = False,
auto_tls: bool = False, auto_tls: bool = False,
single_process: bool = False, single_process: bool = False,
legacy: bool = False,
) -> None: ) -> None:
if version == 3 and self.state.server_info: if version == 3 and self.state.server_info:
raise RuntimeError( raise RuntimeError(
@ -264,13 +258,10 @@ class StartupMixin(metaclass=SanicMeta):
"or auto-reload" "or auto-reload"
) )
if single_process and legacy: if register_sys_signals is False and not single_process:
raise RuntimeError("Cannot run single process and legacy mode")
if register_sys_signals is False and not (single_process or legacy):
raise RuntimeError( raise RuntimeError(
"Cannot run Sanic.serve with register_sys_signals=False. " "Cannot run Sanic.serve with register_sys_signals=False. "
"Use either Sanic.serve_single or Sanic.serve_legacy." "Use Sanic.serve_single."
) )
if motd_display: if motd_display:
@ -956,76 +947,6 @@ class StartupMixin(metaclass=SanicMeta):
cls._cleanup_env_vars() cls._cleanup_env_vars()
cls._cleanup_apps() cls._cleanup_apps()
@classmethod
def serve_legacy(cls, primary: Optional[Sanic] = None) -> None:
apps = list(cls._app_registry.values())
if not primary:
try:
primary = apps[0]
except IndexError:
raise RuntimeError("Did not find any applications.")
reloader_start = primary.listeners.get("reload_process_start")
reloader_stop = primary.listeners.get("reload_process_stop")
# We want to run auto_reload if ANY of the applications have it enabled
if (
cls.should_auto_reload()
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
): # no cov
loop = new_event_loop()
trigger_events(reloader_start, loop, primary)
reload_dirs: Set[Path] = primary.state.reload_dirs.union(
*(app.state.reload_dirs for app in apps)
)
watchdog(1.0, reload_dirs)
trigger_events(reloader_stop, loop, primary)
return
# This exists primarily for unit testing
if not primary.state.server_info: # no cov
for app in apps:
app.state.server_info.clear()
return
primary_server_info = primary.state.server_info[0]
primary.before_server_start(partial(primary._start_servers, apps=apps))
deprecation(
f"{Colors.YELLOW}Running {Colors.SANIC}Sanic {Colors.YELLOW}w/ "
f"LEGACY manager.{Colors.END} Support for will be dropped in "
"version 23.3.",
23.3,
)
try:
primary_server_info.stage = ServerStage.SERVING
if primary.state.workers > 1 and os.name != "posix": # no cov
logger.warning(
f"Multiprocessing is currently not supported on {os.name},"
" using workers=1 instead"
)
primary.state.workers = 1
if primary.state.workers == 1:
serve_single(primary_server_info.settings)
elif primary.state.workers == 0:
raise RuntimeError("Cannot serve with no workers")
else:
serve_multiple(
primary_server_info.settings, primary.state.workers
)
except BaseException:
error_logger.exception(
"Experienced exception while trying to serve"
)
raise
finally:
primary_server_info.stage = ServerStage.STOPPED
logger.info("Server Stopped")
cls._cleanup_env_vars()
cls._cleanup_apps()
async def _start_servers( async def _start_servers(
self, self,
primary: Sanic, primary: Sanic,

View File

@ -3,7 +3,7 @@ from functools import partial, wraps
from mimetypes import guess_type from mimetypes import guess_type
from os import PathLike, path from os import PathLike, path
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Optional, Sequence, Set, Union, cast from typing import Optional, Sequence, Set, Union
from urllib.parse import unquote from urllib.parse import unquote
from sanic_routing.route import Route from sanic_routing.route import Route
@ -14,7 +14,7 @@ from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler from sanic.handlers import ContentRangeHandler
from sanic.handlers.directory import DirectoryHandler from sanic.handlers.directory import DirectoryHandler
from sanic.log import deprecation, error_logger from sanic.log import error_logger
from sanic.mixins.base import BaseMixin from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureStatic from sanic.models.futures import FutureStatic
from sanic.request import Request from sanic.request import Request
@ -31,7 +31,7 @@ class StaticMixin(BaseMixin, metaclass=SanicMeta):
def static( def static(
self, self,
uri: str, uri: str,
file_or_directory: Union[PathLike, str, bytes], file_or_directory: Union[PathLike, str],
pattern: str = r"/?.+", pattern: str = r"/?.+",
use_modified_since: bool = True, use_modified_since: bool = True,
use_content_range: bool = False, use_content_range: bool = False,
@ -94,14 +94,12 @@ class StaticMixin(BaseMixin, metaclass=SanicMeta):
f"Static route must be a valid path, not {file_or_directory}" f"Static route must be a valid path, not {file_or_directory}"
) )
if isinstance(file_or_directory, bytes): try:
deprecation( file_or_directory = Path(file_or_directory)
"Serving a static directory with a bytes string is " except TypeError:
"deprecated and will be removed in v22.9.", raise TypeError(
22.9, "Static file or directory must be a path-like object or string"
) )
file_or_directory = cast(str, file_or_directory.decode())
file_or_directory = Path(file_or_directory)
if directory_handler and (directory_view or index): if directory_handler and (directory_view or index):
raise ValueError( raise ValueError(

11
sanic/request/__init__.py Normal file
View File

@ -0,0 +1,11 @@
from .form import File, parse_multipart_form
from .parameters import RequestParameters
from .types import Request
__all__ = (
"File",
"parse_multipart_form",
"Request",
"RequestParameters",
)

110
sanic/request/form.py Normal file
View File

@ -0,0 +1,110 @@
from __future__ import annotations
import email.utils
import unicodedata
from typing import NamedTuple
from urllib.parse import unquote
from sanic.headers import parse_content_header
from sanic.log import logger
from .parameters import RequestParameters
class File(NamedTuple):
"""
Model for defining a file. It is a ``namedtuple``, therefore you can
iterate over the object, or access the parameters by name.
:param type: The mimetype, defaults to text/plain
:param body: Bytes of the file
:param name: The filename
"""
type: str
body: bytes
name: str
def parse_multipart_form(body, boundary):
"""
Parse a request body and returns fields and files
:param body: bytes request body
:param boundary: bytes multipart boundary
:return: fields (RequestParameters), files (RequestParameters)
"""
files = {}
fields = {}
form_parts = body.split(boundary)
for form_part in form_parts[1:-1]:
file_name = None
content_type = "text/plain"
content_charset = "utf-8"
field_name = None
line_index = 2
line_end_index = 0
while not line_end_index == -1:
line_end_index = form_part.find(b"\r\n", line_index)
form_line = form_part[line_index:line_end_index].decode("utf-8")
line_index = line_end_index + 2
if not form_line:
break
colon_index = form_line.index(":")
idx = colon_index + 2
form_header_field = form_line[0:colon_index].lower()
form_header_value, form_parameters = parse_content_header(
form_line[idx:]
)
if form_header_field == "content-disposition":
field_name = form_parameters.get("name")
file_name = form_parameters.get("filename")
# non-ASCII filenames in RFC2231, "filename*" format
if file_name is None and form_parameters.get("filename*"):
encoding, _, value = email.utils.decode_rfc2231(
form_parameters["filename*"]
)
file_name = unquote(value, encoding=encoding)
# Normalize to NFC (Apple MacOS/iOS send NFD)
# Notes:
# - No effect for Windows, Linux or Android clients which
# already send NFC
# - Python open() is tricky (creates files in NFC no matter
# which form you use)
if file_name is not None:
file_name = unicodedata.normalize("NFC", file_name)
elif form_header_field == "content-type":
content_type = form_header_value
content_charset = form_parameters.get("charset", "utf-8")
if field_name:
post_data = form_part[line_index:-4]
if file_name is None:
value = post_data.decode(content_charset)
if field_name in fields:
fields[field_name].append(value)
else:
fields[field_name] = [value]
else:
form_file = File(
type=content_type, name=file_name, body=post_data
)
if field_name in files:
files[field_name].append(form_file)
else:
files[field_name] = [form_file]
else:
logger.debug(
"Form-data field does not have a 'name' parameter "
"in the Content-Disposition header"
)
return RequestParameters(fields), RequestParameters(files)

View File

@ -0,0 +1,22 @@
from __future__ import annotations
from typing import Any, Optional
class RequestParameters(dict):
"""
Hosts a dict with lists as values where get returns the first
value of the list and getlist returns the whole shebang
"""
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
"""Return the first value, either the default or actual"""
return super().get(name, [default])[0]
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
"""
Return the entire list
"""
return super().get(name, default)

View File

@ -8,10 +8,10 @@ from typing import (
DefaultDict, DefaultDict,
Dict, Dict,
List, List,
NamedTuple,
Optional, Optional,
Tuple, Tuple,
Union, Union,
cast,
) )
from sanic_routing.route import Route from sanic_routing.route import Route
@ -26,14 +26,11 @@ if TYPE_CHECKING:
from sanic.server import ConnInfo from sanic.server import ConnInfo
from sanic.app import Sanic from sanic.app import Sanic
import email.utils
import unicodedata
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from http.cookies import SimpleCookie
from types import SimpleNamespace from types import SimpleNamespace
from urllib.parse import parse_qs, parse_qsl, unquote, urlunparse from urllib.parse import parse_qs, parse_qsl, urlunparse
from httptools import parse_url from httptools import parse_url
from httptools.parser.errors import HttpParserInvalidURLError from httptools.parser.errors import HttpParserInvalidURLError
@ -45,6 +42,7 @@ from sanic.constants import (
IDEMPOTENT_HTTP_METHODS, IDEMPOTENT_HTTP_METHODS,
SAFE_HTTP_METHODS, SAFE_HTTP_METHODS,
) )
from sanic.cookies.request import CookieRequestParameters, parse_cookie
from sanic.exceptions import BadRequest, BadURL, ServerError from sanic.exceptions import BadRequest, BadURL, ServerError
from sanic.headers import ( from sanic.headers import (
AcceptList, AcceptList,
@ -57,10 +55,13 @@ from sanic.headers import (
parse_xforwarded, parse_xforwarded,
) )
from sanic.http import Stage from sanic.http import Stage
from sanic.log import deprecation, error_logger, logger from sanic.log import error_logger
from sanic.models.protocol_types import TransportProtocol from sanic.models.protocol_types import TransportProtocol
from sanic.response import BaseHTTPResponse, HTTPResponse from sanic.response import BaseHTTPResponse, HTTPResponse
from .form import parse_multipart_form
from .parameters import RequestParameters
try: try:
from ujson import loads as json_loads # type: ignore from ujson import loads as json_loads # type: ignore
@ -68,25 +69,6 @@ except ImportError:
from json import loads as json_loads # type: ignore from json import loads as json_loads # type: ignore
class RequestParameters(dict):
"""
Hosts a dict with lists as values where get returns the first
value of the list and getlist returns the whole shebang
"""
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
"""Return the first value, either the default or actual"""
return super().get(name, [default])[0]
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
"""
Return the entire list
"""
return super().get(name, default)
class Request: class Request:
""" """
Properties of an HTTP request such as URL, headers, etc. Properties of an HTTP request such as URL, headers, etc.
@ -120,6 +102,7 @@ class Request:
"method", "method",
"parsed_accept", "parsed_accept",
"parsed_args", "parsed_args",
"parsed_cookies",
"parsed_credentials", "parsed_credentials",
"parsed_files", "parsed_files",
"parsed_form", "parsed_form",
@ -150,7 +133,8 @@ class Request:
try: try:
self._parsed_url = parse_url(url_bytes) self._parsed_url = parse_url(url_bytes)
except HttpParserInvalidURLError: except HttpParserInvalidURLError:
raise BadURL(f"Bad URL: {url_bytes.decode()}") url = url_bytes.decode(errors="backslashreplace")
raise BadURL(f"Bad URL: {url}")
self._id: Optional[Union[uuid.UUID, str, int]] = None self._id: Optional[Union[uuid.UUID, str, int]] = None
self._name: Optional[str] = None self._name: Optional[str] = None
self._stream_id = stream_id self._stream_id = stream_id
@ -166,25 +150,25 @@ class Request:
self.body = b"" self.body = b""
self.conn_info: Optional[ConnInfo] = None self.conn_info: Optional[ConnInfo] = None
self.ctx = SimpleNamespace() self.ctx = SimpleNamespace()
self.parsed_forwarded: Optional[Options] = None
self.parsed_accept: Optional[AcceptList] = None self.parsed_accept: Optional[AcceptList] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_json = None
self.parsed_form: Optional[RequestParameters] = None
self.parsed_files: Optional[RequestParameters] = None
self.parsed_token: Optional[str] = None
self.parsed_args: DefaultDict[ self.parsed_args: DefaultDict[
Tuple[bool, bool, str, str], RequestParameters Tuple[bool, bool, str, str], RequestParameters
] = defaultdict(RequestParameters) ] = defaultdict(RequestParameters)
self.parsed_cookies: Optional[RequestParameters] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_files: Optional[RequestParameters] = None
self.parsed_form: Optional[RequestParameters] = None
self.parsed_forwarded: Optional[Options] = None
self.parsed_json = None
self.parsed_not_grouped_args: DefaultDict[ self.parsed_not_grouped_args: DefaultDict[
Tuple[bool, bool, str, str], List[Tuple[str, str]] Tuple[bool, bool, str, str], List[Tuple[str, str]]
] = defaultdict(list) ] = defaultdict(list)
self.parsed_token: Optional[str] = None
self._request_middleware_started = False self._request_middleware_started = False
self._response_middleware_started = False self._response_middleware_started = False
self.responded: bool = False self.responded: bool = False
self.route: Optional[Route] = None self.route: Optional[Route] = None
self.stream: Optional[Stream] = None self.stream: Optional[Stream] = None
self._cookies: Optional[Dict[str, str]] = None
self._match_info: Dict[str, Any] = {} self._match_info: Dict[str, Any] = {}
self._protocol = None self._protocol = None
@ -221,16 +205,6 @@ class Request:
def generate_id(*_): def generate_id(*_):
return uuid.uuid4() return uuid.uuid4()
@property
def request_middleware_started(self):
deprecation(
"Request.request_middleware_started has been deprecated and will"
"be removed. You should set a flag on the request context using"
"either middleware or signals if you need this feature.",
23.3,
)
return self._request_middleware_started
@property @property
def stream_id(self): def stream_id(self):
""" """
@ -731,24 +705,21 @@ class Request:
default values. default values.
""" """
def get_cookies(self) -> RequestParameters:
cookie = self.headers.getone("cookie", "")
self.parsed_cookies = CookieRequestParameters(parse_cookie(cookie))
return self.parsed_cookies
@property @property
def cookies(self) -> Dict[str, str]: def cookies(self) -> RequestParameters:
""" """
:return: Incoming cookies on the request :return: Incoming cookies on the request
:rtype: Dict[str, str] :rtype: Dict[str, str]
""" """
if self._cookies is None: if self.parsed_cookies is None:
cookie = self.headers.getone("cookie", None) self.get_cookies()
if cookie is not None: return cast(CookieRequestParameters, self.parsed_cookies)
cookies: SimpleCookie = SimpleCookie()
cookies.load(cookie)
self._cookies = {
name: cookie.value for name, cookie in cookies.items()
}
else:
self._cookies = {}
return self._cookies
@property @property
def content_type(self) -> str: def content_type(self) -> str:
@ -1026,101 +997,3 @@ class Request:
:rtype: bool :rtype: bool
""" """
return self.method in CACHEABLE_HTTP_METHODS return self.method in CACHEABLE_HTTP_METHODS
class File(NamedTuple):
"""
Model for defining a file. It is a ``namedtuple``, therefore you can
iterate over the object, or access the parameters by name.
:param type: The mimetype, defaults to text/plain
:param body: Bytes of the file
:param name: The filename
"""
type: str
body: bytes
name: str
def parse_multipart_form(body, boundary):
"""
Parse a request body and returns fields and files
:param body: bytes request body
:param boundary: bytes multipart boundary
:return: fields (RequestParameters), files (RequestParameters)
"""
files = RequestParameters()
fields = RequestParameters()
form_parts = body.split(boundary)
for form_part in form_parts[1:-1]:
file_name = None
content_type = "text/plain"
content_charset = "utf-8"
field_name = None
line_index = 2
line_end_index = 0
while not line_end_index == -1:
line_end_index = form_part.find(b"\r\n", line_index)
form_line = form_part[line_index:line_end_index].decode("utf-8")
line_index = line_end_index + 2
if not form_line:
break
colon_index = form_line.index(":")
idx = colon_index + 2
form_header_field = form_line[0:colon_index].lower()
form_header_value, form_parameters = parse_content_header(
form_line[idx:]
)
if form_header_field == "content-disposition":
field_name = form_parameters.get("name")
file_name = form_parameters.get("filename")
# non-ASCII filenames in RFC2231, "filename*" format
if file_name is None and form_parameters.get("filename*"):
encoding, _, value = email.utils.decode_rfc2231(
form_parameters["filename*"]
)
file_name = unquote(value, encoding=encoding)
# Normalize to NFC (Apple MacOS/iOS send NFD)
# Notes:
# - No effect for Windows, Linux or Android clients which
# already send NFC
# - Python open() is tricky (creates files in NFC no matter
# which form you use)
if file_name is not None:
file_name = unicodedata.normalize("NFC", file_name)
elif form_header_field == "content-type":
content_type = form_header_value
content_charset = form_parameters.get("charset", "utf-8")
if field_name:
post_data = form_part[line_index:-4]
if file_name is None:
value = post_data.decode(content_charset)
if field_name in fields:
fields[field_name].append(value)
else:
fields[field_name] = [value]
else:
form_file = File(
type=content_type, name=file_name, body=post_data
)
if field_name in files:
files[field_name].append(form_file)
else:
files[field_name] = [form_file]
else:
logger.debug(
"Form-data field does not have a 'name' parameter "
"in the Content-Disposition header"
)
return fields, files

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from functools import partial from functools import partial
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -17,6 +18,7 @@ from typing import (
from sanic.compat import Header from sanic.compat import Header
from sanic.cookies import CookieJar from sanic.cookies import CookieJar
from sanic.cookies.response import Cookie, SameSite
from sanic.exceptions import SanicException, ServerError from sanic.exceptions import SanicException, ServerError
from sanic.helpers import ( from sanic.helpers import (
Default, Default,
@ -158,6 +160,117 @@ class BaseHTTPResponse:
end_stream=end_stream or False, end_stream=end_stream or False,
) )
def add_cookie(
self,
key: str,
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Cookie:
"""
Add a cookie to the CookieJar
:param key: Key of the cookie
:type key: str
:param value: Value of the cookie
:type value: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param secure: Whether to set it as a secure cookie, defaults to True
:type secure: bool
:param max_age: Max age of the cookie in seconds; if set to 0 a
browser should delete it, defaults to None
:type max_age: Optional[int], optional
:param expires: When the cookie expires; if set to None browsers
should set it as a session cookie, defaults to None
:type expires: Optional[datetime], optional
:param httponly: Whether to set it as HTTP only, defaults to False
:type httponly: bool
:param samesite: How to set the samesite property, should be
strict, lax or none (case insensitive), defaults to Lax
:type samesite: Optional[SameSite], optional
:param partitioned: Whether to set it as partitioned, defaults to False
:type partitioned: bool
:param comment: A cookie comment, defaults to None
:type comment: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
:return: The instance of the created cookie
:rtype: Cookie
"""
return self.cookies.add_cookie(
key=key,
value=value,
path=path,
domain=domain,
secure=secure,
max_age=max_age,
expires=expires,
httponly=httponly,
samesite=samesite,
partitioned=partitioned,
comment=comment,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
def delete_cookie(
self,
key: str,
*,
path: str = "/",
domain: Optional[str] = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> None:
"""
Delete a cookie
This will effectively set it as Max-Age: 0, which a browser should
interpret it to mean: "delete the cookie".
Since it is a browser/client implementation, your results may vary
depending upon which client is being used.
:param key: The key to be deleted
:type key: str
:param path: Path of the cookie, defaults to None
:type path: Optional[str], optional
:param domain: Domain of the cookie, defaults to None
:type domain: Optional[str], optional
:param host_prefix: Whether to add __Host- as a prefix to the key.
This requires that path="/", domain=None, and secure=True,
defaults to False
:type host_prefix: bool
:param secure_prefix: Whether to add __Secure- as a prefix to the key.
This requires that secure=True, defaults to False
:type secure_prefix: bool
"""
self.cookies.delete_cookie(
key=key,
path=path,
domain=domain,
host_prefix=host_prefix,
secure_prefix=secure_prefix,
)
class HTTPResponse(BaseHTTPResponse): class HTTPResponse(BaseHTTPResponse):
""" """
@ -407,6 +520,8 @@ class ResponseStream:
headers: Optional[Union[Header, Dict[str, str]]] = None, headers: Optional[Union[Header, Dict[str, str]]] = None,
content_type: Optional[str] = None, content_type: Optional[str] = None,
): ):
if not isinstance(headers, Header):
headers = Header(headers)
self.streaming_fn = streaming_fn self.streaming_fn = streaming_fn
self.status = status self.status = status
self.headers = headers or Header() self.headers = headers or Header()

View File

@ -44,7 +44,9 @@ class Router(BaseRouter):
raise MethodNotAllowed( raise MethodNotAllowed(
f"Method {method} not allowed for URL {path}", f"Method {method} not allowed for URL {path}",
method=method, method=method,
allowed_methods=e.allowed_methods, allowed_methods=tuple(e.allowed_methods)
if e.allowed_methods
else None,
) from None ) from None
@lru_cache(maxsize=ROUTER_CACHE_SIZE) @lru_cache(maxsize=ROUTER_CACHE_SIZE)
@ -135,7 +137,16 @@ class Router(BaseRouter):
if host: if host:
params.update({"requirements": {"host": host}}) params.update({"requirements": {"host": host}})
ident = name
if len(hosts) > 1:
ident = (
f"{name}_{host.replace('.', '_')}"
if name
else "__unnamed__"
)
route = super().add(**params) # type: ignore route = super().add(**params) # type: ignore
route.extra.ident = ident
route.extra.ignore_body = ignore_body route.extra.ignore_body = ignore_body
route.extra.stream = stream route.extra.stream = stream
route.extra.hosts = hosts route.extra.hosts = hosts

View File

@ -2,7 +2,7 @@ from sanic.models.server_types import ConnInfo, Signal
from sanic.server.async_server import AsyncioServer from sanic.server.async_server import AsyncioServer
from sanic.server.loop import try_use_uvloop from sanic.server.loop import try_use_uvloop
from sanic.server.protocols.http_protocol import HttpProtocol from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.runners import serve, serve_multiple, serve_single from sanic.server.runners import serve
__all__ = ( __all__ = (
@ -11,7 +11,5 @@ __all__ = (
"HttpProtocol", "HttpProtocol",
"Signal", "Signal",
"serve", "serve",
"serve_multiple",
"serve_single",
"try_use_uvloop", "try_use_uvloop",
) )

View File

@ -1,123 +0,0 @@
import itertools
import os
import signal
import subprocess
import sys
from time import sleep
def _iter_module_files():
"""This iterates over all relevant Python files.
It goes through all
loaded files from modules, all files in folders of already loaded modules
as well as all files reachable through a package.
"""
# The list call is necessary on Python 3 in case the module
# dictionary modifies during iteration.
for module in list(sys.modules.values()):
if module is None:
continue
filename = getattr(module, "__file__", None)
if filename:
old = None
while not os.path.isfile(filename):
old = filename
filename = os.path.dirname(filename)
if filename == old:
break
else:
if filename[-4:] in (".pyc", ".pyo"):
filename = filename[:-1]
yield filename
def _get_args_for_reloading():
"""Returns the executable."""
main_module = sys.modules["__main__"]
mod_spec = getattr(main_module, "__spec__", None)
if sys.argv[0] in ("", "-c"):
raise RuntimeError(
f"Autoreloader cannot work with argv[0]={sys.argv[0]!r}"
)
if mod_spec:
# Parent exe was launched as a module rather than a script
return [sys.executable, "-m", mod_spec.name] + sys.argv[1:]
return [sys.executable] + sys.argv
def restart_with_reloader(changed=None):
"""Create a new process and a subprocess in it with the same arguments as
this one.
"""
reloaded = ",".join(changed) if changed else ""
return subprocess.Popen( # nosec B603
_get_args_for_reloading(),
env={
**os.environ,
"SANIC_SERVER_RUNNING": "true",
"SANIC_RELOADER_PROCESS": "true",
"SANIC_RELOADED_FILES": reloaded,
},
)
def _check_file(filename, mtimes):
need_reload = False
mtime = os.stat(filename).st_mtime
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
elif mtime > old_time:
mtimes[filename] = mtime
need_reload = True
return need_reload
def watchdog(sleep_interval, reload_dirs):
"""Watch project files, restart worker process if a change happened.
:param sleep_interval: interval in second.
:return: Nothing
"""
def interrupt_self(*args):
raise KeyboardInterrupt
mtimes = {}
signal.signal(signal.SIGTERM, interrupt_self)
if os.name == "nt":
signal.signal(signal.SIGBREAK, interrupt_self)
worker_process = restart_with_reloader()
try:
while True:
changed = set()
for filename in itertools.chain(
_iter_module_files(),
*(d.glob("**/*") for d in reload_dirs),
):
try:
if _check_file(filename, mtimes):
path = (
filename
if isinstance(filename, str)
else filename.resolve()
)
changed.add(str(path))
except OSError:
continue
if changed:
worker_process.terminate()
worker_process.wait()
worker_process = restart_with_reloader(changed)
sleep(sleep_interval)
except KeyboardInterrupt:
pass
finally:
worker_process.terminate()
worker_process.wait()

View File

@ -9,19 +9,17 @@ from sanic.config import Config
from sanic.exceptions import ServerError from sanic.exceptions import ServerError
from sanic.http.constants import HTTP from sanic.http.constants import HTTP
from sanic.http.tls import get_ssl_context from sanic.http.tls import get_ssl_context
from sanic.server.events import trigger_events
if TYPE_CHECKING: if TYPE_CHECKING:
from sanic.app import Sanic from sanic.app import Sanic
import asyncio import asyncio
import multiprocessing
import os import os
import socket import socket
from functools import partial from functools import partial
from signal import SIG_IGN, SIGINT, SIGTERM, Signals from signal import SIG_IGN, SIGINT, SIGTERM
from signal import signal as signal_func from signal import signal as signal_func
from sanic.application.ext import setup_ext from sanic.application.ext import setup_ext
@ -31,11 +29,7 @@ from sanic.log import error_logger, server_logger
from sanic.models.server_types import Signal from sanic.models.server_types import Signal
from sanic.server.async_server import AsyncioServer from sanic.server.async_server import AsyncioServer
from sanic.server.protocols.http_protocol import Http3Protocol, HttpProtocol from sanic.server.protocols.http_protocol import Http3Protocol, HttpProtocol
from sanic.server.socket import ( from sanic.server.socket import bind_unix_socket, remove_unix_socket
bind_socket,
bind_unix_socket,
remove_unix_socket,
)
try: try:
@ -319,94 +313,6 @@ def _serve_http_3(
) )
def serve_single(server_settings):
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
if not server_settings.get("run_async"):
# create new event_loop after fork
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server_settings["loop"] = loop
trigger_events(main_start, server_settings["loop"])
serve(**server_settings)
trigger_events(main_stop, server_settings["loop"])
server_settings["loop"].close()
def serve_multiple(server_settings, workers):
"""Start multiple server processes simultaneously. Stop on interrupt
and terminate signals, and drain connections when complete.
:param server_settings: kw arguments to be passed to the serve function
:param workers: number of workers to launch
:param stop_event: if provided, is used as a stop signal
:return:
"""
server_settings["reuse_port"] = True
server_settings["run_multiple"] = True
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
trigger_events(main_start, loop)
# Create a listening socket or use the one in settings
sock = server_settings.get("sock")
unix = server_settings["unix"]
backlog = server_settings["backlog"]
if unix:
sock = bind_unix_socket(unix, backlog=backlog)
server_settings["unix"] = unix
if sock is None:
sock = bind_socket(
server_settings["host"], server_settings["port"], backlog=backlog
)
sock.set_inheritable(True)
server_settings["sock"] = sock
server_settings["host"] = None
server_settings["port"] = None
processes = []
def sig_handler(signal, frame):
server_logger.info(
"Received signal %s. Shutting down.", Signals(signal).name
)
for process in processes:
os.kill(process.pid, SIGTERM)
signal_func(SIGINT, lambda s, f: sig_handler(s, f))
signal_func(SIGTERM, lambda s, f: sig_handler(s, f))
mp = multiprocessing.get_context("fork")
for _ in range(workers):
process = mp.Process(
target=serve,
kwargs=server_settings,
)
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
# the above processes will block this until they're stopped
for process in processes:
process.terminate()
trigger_events(main_stop, loop)
sock.close()
loop.close()
remove_unix_socket(unix)
def _build_protocol_kwargs( def _build_protocol_kwargs(
protocol: Type[asyncio.Protocol], config: Config protocol: Type[asyncio.Protocol], config: Config
) -> Dict[str, Union[int, float]]: ) -> Dict[str, Union[int, float]]:

View File

@ -29,7 +29,7 @@ except ImportError: # websockets >= 11.0
from websockets.typing import Data from websockets.typing import Data
from sanic.log import deprecation, error_logger, logger from sanic.log import error_logger, logger
from sanic.server.protocols.base_protocol import SanicProtocol from sanic.server.protocols.base_protocol import SanicProtocol
from ...exceptions import ServerError, WebsocketClosed from ...exceptions import ServerError, WebsocketClosed
@ -99,15 +99,6 @@ class WebsocketImplProtocol:
def subprotocol(self): def subprotocol(self):
return self.ws_proto.subprotocol return self.ws_proto.subprotocol
@property
def connection(self):
deprecation(
"The connection property has been deprecated and will be removed. "
"Please use the ws_proto property instead going forward.",
22.6,
)
return self.ws_proto
def pause_frames(self): def pause_frames(self):
if not self.can_pause: if not self.can_pause:
return False return False

View File

@ -3,7 +3,9 @@ from __future__ import annotations
import os import os
import sys import sys
from contextlib import suppress
from importlib import import_module from importlib import import_module
from inspect import isfunction
from pathlib import Path from pathlib import Path
from ssl import SSLContext from ssl import SSLContext
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Union, cast from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Union, cast
@ -15,6 +17,8 @@ from sanic.http.tls.creators import MkcertCreator, TrustmeCreator
if TYPE_CHECKING: if TYPE_CHECKING:
from sanic import Sanic as SanicApp from sanic import Sanic as SanicApp
DEFAULT_APP_NAME = "app"
class AppLoader: class AppLoader:
def __init__( def __init__(
@ -36,7 +40,11 @@ class AppLoader:
if module_input: if module_input:
delimiter = ":" if ":" in module_input else "." delimiter = ":" if ":" in module_input else "."
if module_input.count(delimiter): if (
delimiter in module_input
and "\\" not in module_input
and "/" not in module_input
):
module_name, app_name = module_input.rsplit(delimiter, 1) module_name, app_name = module_input.rsplit(delimiter, 1)
self.module_name = module_name self.module_name = module_name
self.app_name = app_name self.app_name = app_name
@ -55,21 +63,30 @@ class AppLoader:
from sanic.app import Sanic from sanic.app import Sanic
from sanic.simple import create_simple_server from sanic.simple import create_simple_server
if self.as_simple: maybe_path = Path(self.module_input)
path = Path(self.module_input) if self.as_simple or (
app = create_simple_server(path) maybe_path.is_dir()
and ("\\" in self.module_input or "/" in self.module_input)
):
app = create_simple_server(maybe_path)
else: else:
if self.module_name == "" and os.path.isdir(self.module_input): implied_app_name = False
raise ValueError( if not self.module_name and not self.app_name:
"App not found.\n" self.module_name = self.module_input
" Please use --simple if you are passing a " self.app_name = DEFAULT_APP_NAME
"directory to sanic.\n" implied_app_name = True
f" eg. sanic {self.module_input} --simple"
)
module = import_module(self.module_name) module = import_module(self.module_name)
app = getattr(module, self.app_name, None) app = getattr(module, self.app_name, None)
if self.as_factory: if not app and implied_app_name:
raise ValueError(
"Looks like you only supplied a module name. Sanic "
"tried to locate an application instance named "
f"{self.module_name}:app, but was unable to locate "
"an application instance. Please provide a path "
"to a global instance of Sanic(), or a callable that "
"will return a Sanic() application instance."
)
if self.as_factory or isfunction(app):
try: try:
app = app(self.args) app = app(self.args)
except TypeError: except TypeError:
@ -80,21 +97,18 @@ class AppLoader:
if ( if (
not isinstance(app, Sanic) not isinstance(app, Sanic)
and self.args and self.args
and hasattr(self.args, "module") and hasattr(self.args, "target")
): ):
if callable(app): with suppress(ModuleNotFoundError):
solution = f"sanic {self.args.module} --factory" maybe_module = import_module(self.module_input)
raise ValueError( app = getattr(maybe_module, "app", None)
"Module is not a Sanic app, it is a " if not app:
f"{app_type_name}\n" message = (
" If this callable returns a " "Module is not a Sanic app, "
f"Sanic instance try: \n{solution}" f"it is a {app_type_name}\n"
f" Perhaps you meant {self.args.target}:app?"
) )
raise ValueError(message)
raise ValueError(
f"Module is not a Sanic app, it is a {app_type_name}\n"
f" Perhaps you meant {self.args.module}:app?"
)
return app return app

View File

@ -312,6 +312,10 @@ class WorkerManager:
def _sync_states(self): def _sync_states(self):
for process in self.processes: for process in self.processes:
state = self.worker_state[process.name].get("state") try:
state = self.worker_state[process.name].get("state")
except KeyError:
process.set_state(ProcessState.TERMINATED, True)
continue
if state and process.state.name != state: if state and process.state.name != state:
process.set_state(ProcessState[state], True) process.set_state(ProcessState[state], True)

View File

@ -116,7 +116,7 @@ requirements = [
] ]
tests_require = [ tests_require = [
"sanic-testing>=22.9.0", "sanic-testing>=23.3.0",
"pytest==7.1.*", "pytest==7.1.*",
"coverage", "coverage",
"beautifulsoup4", "beautifulsoup4",

View File

@ -49,6 +49,6 @@ def create_app_with_args(args):
try: try:
logger.info(f"foo={args.foo}") logger.info(f"foo={args.foo}")
except AttributeError: except AttributeError:
logger.info(f"module={args.module}") logger.info(f"target={args.target}")
return app return app

View File

@ -11,7 +11,7 @@ from aioquic.quic.events import ProtocolNegotiated
from sanic import Request, Sanic from sanic import Request, Sanic
from sanic.compat import Header from sanic.compat import Header
from sanic.config import DEFAULT_CONFIG from sanic.config import DEFAULT_CONFIG
from sanic.exceptions import PayloadTooLarge from sanic.exceptions import BadRequest, PayloadTooLarge
from sanic.http.constants import Stage from sanic.http.constants import Stage
from sanic.http.http3 import Http3, HTTPReceiver from sanic.http.http3 import Http3, HTTPReceiver
from sanic.models.server_types import ConnInfo from sanic.models.server_types import ConnInfo
@ -292,3 +292,48 @@ def test_request_conn_info(app):
receiver = http3.get_receiver_by_stream_id(1) receiver = http3.get_receiver_by_stream_id(1)
assert isinstance(receiver.request.conn_info, ConnInfo) assert isinstance(receiver.request.conn_info, ConnInfo)
def test_request_header_encoding(app):
protocol = generate_protocol(app)
http3 = Http3(protocol, protocol.transmit)
with pytest.raises(BadRequest) as exc_info:
http3.http_event_received(
HeadersReceived(
[
(b":method", b"GET"),
(b":path", b"/location"),
(b":scheme", b"https"),
(b":authority", b"localhost:8443"),
("foo\u00A0".encode(), b"bar"),
],
1,
False,
)
)
assert exc_info.value.status_code == 400
assert (
str(exc_info.value)
== "Header names may only contain US-ASCII characters."
)
def test_request_url_encoding(app):
protocol = generate_protocol(app)
http3 = Http3(protocol, protocol.transmit)
with pytest.raises(BadRequest) as exc_info:
http3.http_event_received(
HeadersReceived(
[
(b":method", b"GET"),
(b":path", b"/location\xA0"),
(b":scheme", b"https"),
(b":authority", b"localhost:8443"),
(b"foo", b"bar"),
],
1,
False,
)
)
assert exc_info.value.status_code == 400
assert str(exc_info.value) == "URL may only contain US-ASCII characters."

View File

@ -448,7 +448,7 @@ def test_custom_context():
@pytest.mark.parametrize("use", (False, True)) @pytest.mark.parametrize("use", (False, True))
def test_uvloop_config(app: Sanic, monkeypatch, use): def test_uvloop_config(app: Sanic, monkeypatch, use):
@app.get("/test") @app.get("/test", name="test")
def handler(request): def handler(request):
return text("ok") return text("ok")
@ -571,21 +571,6 @@ def test_cannot_run_single_process_and_workers_or_auto_reload(
app.run(single_process=True, **extra) app.run(single_process=True, **extra)
def test_cannot_run_single_process_and_legacy(app: Sanic):
message = "Cannot run single process and legacy mode"
with pytest.raises(RuntimeError, match=message):
app.run(single_process=True, legacy=True)
def test_cannot_run_without_sys_signals_with_workers(app: Sanic):
message = (
"Cannot run Sanic.serve with register_sys_signals=False. "
"Use either Sanic.serve_single or Sanic.serve_legacy."
)
with pytest.raises(RuntimeError, match=message):
app.run(register_sys_signals=False, single_process=False, legacy=False)
def test_default_configure_logging(): def test_default_configure_logging():
with patch("sanic.app.logging") as mock: with patch("sanic.app.logging") as mock:
Sanic("Test") Sanic("Test")

View File

@ -652,3 +652,17 @@ async def test_asgi_headers_decoding(app: Sanic, monkeypatch: MonkeyPatch):
_, response = await app.asgi_client.get("/", headers={"Test-Header": "😅"}) _, response = await app.asgi_client.get("/", headers={"Test-Header": "😅"})
assert response.status_code == 200 assert response.status_code == 200
@pytest.mark.asyncio
async def test_asgi_url_decoding(app):
@app.get("/dir/<name>", unquote=True)
def _request(request: Request, name):
return text(name)
# 2F should not become a path separator (unquoted later)
_, response = await app.asgi_client.get("/dir/some%2Fpath")
assert response.text == "some/path"
_, response = await app.asgi_client.get("/dir/some%F0%9F%98%80path")
assert response.text == "some😀path"

View File

@ -67,6 +67,14 @@ def test_bp_copy(app: Sanic):
_, response = app.test_client.get("/version6/page") _, response = app.test_client.get("/version6/page")
assert "Hello world!" in response.text assert "Hello world!" in response.text
route_names = [route.name for route in app.router.routes]
assert "test_bp_copy.test_bp1.handle_request" in route_names
assert "test_bp_copy.test_bp2.handle_request" in route_names
assert "test_bp_copy.test_bp3.handle_request" in route_names
assert "test_bp_copy.test_bp4.handle_request" in route_names
assert "test_bp_copy.test_bp5.handle_request" in route_names
assert "test_bp_copy.test_bp6.handle_request" in route_names
def test_bp_copy_with_route_overwriting(app: Sanic): def test_bp_copy_with_route_overwriting(app: Sanic):
bpv1 = Blueprint("bp_v1", version=1) bpv1 = Blueprint("bp_v1", version=1)

View File

@ -303,6 +303,10 @@ def test_bp_with_host_list(app: Sanic):
assert response.text == "Hello subdomain!" assert response.text == "Hello subdomain!"
route_names = [r.name for r in app.router.routes]
assert "test_bp_with_host_list.test_bp_host.handler1" in route_names
assert "test_bp_with_host_list.test_bp_host.handler2" in route_names
def test_several_bp_with_host_list(app: Sanic): def test_several_bp_with_host_list(app: Sanic):
bp = Blueprint( bp = Blueprint(

View File

@ -43,8 +43,10 @@ def read_app_info(lines: List[str]):
"appname,extra", "appname,extra",
( (
("fake.server.app", None), ("fake.server.app", None),
("fake.server", None),
("fake.server:create_app", "--factory"), ("fake.server:create_app", "--factory"),
("fake.server.create_app()", None), ("fake.server.create_app()", None),
("fake.server.create_app", None),
), ),
) )
def test_server_run( def test_server_run(
@ -60,14 +62,17 @@ def test_server_run(
assert "Goin' Fast @ http://127.0.0.1:8000" in lines assert "Goin' Fast @ http://127.0.0.1:8000" in lines
def test_server_run_factory_with_args(caplog): @pytest.mark.parametrize(
command = [ "command",
"fake.server.create_app_with_args", (
"--factory", ["fake.server.create_app_with_args", "--factory"],
] ["fake.server.create_app_with_args"],
),
)
def test_server_run_factory_with_args(caplog, command):
lines = capture(command, caplog) lines = capture(command, caplog)
assert "module=fake.server.create_app_with_args" in lines assert "target=fake.server.create_app_with_args" in lines
def test_server_run_factory_with_args_arbitrary(caplog): def test_server_run_factory_with_args_arbitrary(caplog):
@ -81,25 +86,6 @@ def test_server_run_factory_with_args_arbitrary(caplog):
assert "foo=bar" in lines assert "foo=bar" in lines
def test_error_with_function_as_instance_without_factory_arg(caplog):
command = ["fake.server.create_app"]
lines = capture(command, caplog)
assert (
"Failed to run app: Module is not a Sanic app, it is a function\n "
"If this callable returns a Sanic instance try: \n"
"sanic fake.server.create_app --factory"
) in lines
def test_error_with_path_as_instance_without_simple_arg(caplog):
command = ["./fake/"]
lines = capture(command, caplog)
assert (
"Failed to run app: App not found.\n Please use --simple if you "
"are passing a directory to sanic.\n eg. sanic ./fake/ --simple"
) in lines
@pytest.mark.parametrize( @pytest.mark.parametrize(
"cmd", "cmd",
( (

View File

@ -1,11 +1,16 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from http.cookies import SimpleCookie from http.cookies import SimpleCookie
from unittest.mock import Mock
import pytest import pytest
from sanic import Sanic from sanic import Request, Sanic
from sanic.cookies import Cookie from sanic.compat import Header
from sanic.cookies import Cookie, CookieJar
from sanic.cookies.request import CookieRequestParameters
from sanic.exceptions import ServerError
from sanic.response import text from sanic.response import text
from sanic.response.convenience import json
# ------------------------------------------------------------ # # ------------------------------------------------------------ #
@ -111,21 +116,23 @@ def test_cookie_options(app):
def test_cookie_deletion(app): def test_cookie_deletion(app):
cookie_jar = None
@app.route("/") @app.route("/")
def handler(request): def handler(request):
nonlocal cookie_jar
response = text("OK") response = text("OK")
del response.cookies["i_want_to_die"] del response.cookies["one"]
response.cookies["i_never_existed"] = "testing" response.cookies["two"] = "testing"
del response.cookies["i_never_existed"] del response.cookies["two"]
cookie_jar = response.cookies
return response return response
request, response = app.test_client.get("/") _, response = app.test_client.get("/")
response_cookies = SimpleCookie()
response_cookies.load(response.headers.get("Set-Cookie", {}))
assert int(response_cookies["i_want_to_die"]["max-age"]) == 0 assert cookie_jar.get_cookie("one").max_age == 0
with pytest.raises(KeyError): assert cookie_jar.get_cookie("two").max_age == 0
response.cookies["i_never_existed"] assert len(response.cookies) == 0
def test_cookie_reserved_cookie(): def test_cookie_reserved_cookie():
@ -252,3 +259,262 @@ def test_cookie_expires_illegal_instance_type(expires):
with pytest.raises(expected_exception=TypeError) as e: with pytest.raises(expected_exception=TypeError) as e:
c["expires"] = expires c["expires"] = expires
assert e.message == "Cookie 'expires' property must be a datetime" assert e.message == "Cookie 'expires' property must be a datetime"
@pytest.mark.parametrize("value", ("foo=one; foo=two", "foo=one;foo=two"))
def test_request_with_duplicate_cookie_key(value):
headers = Header({"Cookie": value})
request = Request(b"/", headers, "1.1", "GET", Mock(), Mock())
assert request.cookies["foo"] == "one"
assert request.cookies.get("foo") == "one"
assert request.cookies.getlist("foo") == ["one", "two"]
assert request.cookies.get("bar") is None
def test_cookie_jar_cookies():
headers = Header()
jar = CookieJar(headers)
jar.add_cookie("foo", "one")
jar.add_cookie("foo", "two", domain="example.com")
assert len(jar.cookies) == 2
assert len(headers) == 2
def test_cookie_jar_has_cookie():
headers = Header()
jar = CookieJar(headers)
jar.add_cookie("foo", "one")
jar.add_cookie("foo", "two", domain="example.com")
assert jar.has_cookie("foo")
assert jar.has_cookie("foo", domain="example.com")
assert not jar.has_cookie("foo", path="/unknown")
assert not jar.has_cookie("bar")
def test_cookie_jar_get_cookie():
headers = Header()
jar = CookieJar(headers)
cookie1 = jar.add_cookie("foo", "one")
cookie2 = jar.add_cookie("foo", "two", domain="example.com")
assert jar.get_cookie("foo") is cookie1
assert jar.get_cookie("foo", domain="example.com") is cookie2
assert jar.get_cookie("foo", path="/unknown") is None
assert jar.get_cookie("bar") is None
def test_cookie_jar_add_cookie_encode():
headers = Header()
jar = CookieJar(headers)
jar.add_cookie("foo", "one")
jar.add_cookie(
"foo",
"two",
domain="example.com",
path="/something",
secure=True,
max_age=999,
httponly=True,
samesite="strict",
)
jar.add_cookie("foo", "three", secure_prefix=True)
jar.add_cookie("foo", "four", host_prefix=True)
jar.add_cookie("foo", "five", host_prefix=True, partitioned=True)
encoded = [cookie.encode("ascii") for cookie in jar.cookies]
assert encoded == [
b"foo=one; Path=/; SameSite=Lax; Secure",
b"foo=two; Path=/something; Domain=example.com; Max-Age=999; SameSite=Strict; Secure; HttpOnly", # noqa
b"__Secure-foo=three; Path=/; SameSite=Lax; Secure",
b"__Host-foo=four; Path=/; SameSite=Lax; Secure",
b"__Host-foo=five; Path=/; SameSite=Lax; Secure; Partitioned",
]
def test_cookie_jar_old_school_cookie_encode():
headers = Header()
jar = CookieJar(headers)
jar["foo"] = "one"
jar["bar"] = "two"
jar["bar"]["domain"] = "example.com"
jar["bar"]["path"] = "/something"
jar["bar"]["secure"] = True
jar["bar"]["max-age"] = 999
jar["bar"]["httponly"] = True
jar["bar"]["samesite"] = "strict"
encoded = [cookie.encode("ascii") for cookie in jar.cookies]
assert encoded == [
b"foo=one; Path=/",
b"bar=two; Path=/something; Domain=example.com; Max-Age=999; SameSite=Strict; Secure; HttpOnly", # noqa
]
def test_cookie_jar_delete_cookie_encode():
headers = Header()
jar = CookieJar(headers)
jar.delete_cookie("foo")
jar.delete_cookie("foo", domain="example.com")
encoded = [cookie.encode("ascii") for cookie in jar.cookies]
assert encoded == [
b'foo=""; Path=/; Max-Age=0; Secure',
b'foo=""; Path=/; Domain=example.com; Max-Age=0; Secure',
]
def test_cookie_jar_old_school_delete_encode():
headers = Header()
jar = CookieJar(headers)
del jar["foo"]
encoded = [cookie.encode("ascii") for cookie in jar.cookies]
assert encoded == [
b'foo=""; Path=/; Max-Age=0; Secure',
]
def test_bad_cookie_prarms():
headers = Header()
jar = CookieJar(headers)
with pytest.raises(
ServerError,
match=(
"Both host_prefix and secure_prefix were requested. "
"A cookie should have only one prefix."
),
):
jar.add_cookie("foo", "bar", host_prefix=True, secure_prefix=True)
with pytest.raises(
ServerError,
match="Cannot set host_prefix on a cookie without secure=True",
):
jar.add_cookie("foo", "bar", host_prefix=True, secure=False)
with pytest.raises(
ServerError,
match="Cannot set host_prefix on a cookie unless path='/'",
):
jar.add_cookie(
"foo", "bar", host_prefix=True, secure=True, path="/foo"
)
with pytest.raises(
ServerError,
match="Cannot set host_prefix on a cookie with a defined domain",
):
jar.add_cookie(
"foo", "bar", host_prefix=True, secure=True, domain="foo.bar"
)
with pytest.raises(
ServerError,
match="Cannot set secure_prefix on a cookie without secure=True",
):
jar.add_cookie("foo", "bar", secure_prefix=True, secure=False)
with pytest.raises(
ServerError,
match=(
"Cannot create a partitioned cookie without "
"also setting host_prefix=True"
),
):
jar.add_cookie("foo", "bar", partitioned=True)
def test_cookie_accessors(app: Sanic):
@app.get("/")
async def handler(request: Request):
return json(
{
"getitem": {
"one": request.cookies["one"],
"two": request.cookies["two"],
"three": request.cookies["three"],
},
"get": {
"one": request.cookies.get("one", "fallback"),
"two": request.cookies.get("two", "fallback"),
"three": request.cookies.get("three", "fallback"),
"four": request.cookies.get("four", "fallback"),
},
"getlist": {
"one": request.cookies.getlist("one", ["fallback"]),
"two": request.cookies.getlist("two", ["fallback"]),
"three": request.cookies.getlist("three", ["fallback"]),
"four": request.cookies.getlist("four", ["fallback"]),
},
"getattr": {
"one": request.cookies.one,
"two": request.cookies.two,
"three": request.cookies.three,
"four": request.cookies.four,
},
}
)
_, response = app.test_client.get(
"/",
cookies={
"__Host-one": "1",
"__Secure-two": "2",
"three": "3",
},
)
assert response.json == {
"getitem": {
"one": "1",
"two": "2",
"three": "3",
},
"get": {
"one": "1",
"two": "2",
"three": "3",
"four": "fallback",
},
"getlist": {
"one": ["1"],
"two": ["2"],
"three": ["3"],
"four": ["fallback"],
},
"getattr": {
"one": "1",
"two": "2",
"three": "3",
"four": "",
},
}
def test_cookie_accessor_hyphens():
cookies = CookieRequestParameters({"session-token": ["abc123"]})
assert cookies.get("session-token") == cookies.session_token
def test_cookie_passthru(app):
cookie_jar = None
@app.route("/")
def handler(request):
nonlocal cookie_jar
response = text("OK")
response.add_cookie("one", "1", host_prefix=True)
response.delete_cookie("two", secure_prefix=True)
cookie_jar = response.cookies
return response
_, response = app.test_client.get("/")
assert cookie_jar.get_cookie("two", secure_prefix=True).max_age == 0
assert len(response.cookies) == 1
assert response.cookies["__Host-one"] == "1"

View File

@ -248,9 +248,9 @@ def test_fallback_with_content_type_mismatch_accept(app):
app.router.reset() app.router.reset()
@app.route("/alt1") @app.route("/alt1", name="alt1")
@app.route("/alt2", error_format="text") @app.route("/alt2", error_format="text", name="alt2")
@app.route("/alt3", error_format="html") @app.route("/alt3", error_format="html", name="alt3")
def handler(_): def handler(_):
raise Exception("problem here") raise Exception("problem here")
# Yes, we know this return value is unreachable. This is on purpose. # Yes, we know this return value is unreachable. This is on purpose.

View File

@ -285,9 +285,15 @@ def test_contextual_exception_context(debug):
def fail(): def fail():
raise TeapotError(context={"foo": "bar"}) raise TeapotError(context={"foo": "bar"})
app.post("/coffee/json", error_format="json")(lambda _: fail()) app.post("/coffee/json", error_format="json", name="json")(
app.post("/coffee/html", error_format="html")(lambda _: fail()) lambda _: fail()
app.post("/coffee/text", error_format="text")(lambda _: fail()) )
app.post("/coffee/html", error_format="html", name="html")(
lambda _: fail()
)
app.post("/coffee/text", error_format="text", name="text")(
lambda _: fail()
)
_, response = app.test_client.post("/coffee/json", debug=debug) _, response = app.test_client.post("/coffee/json", debug=debug)
assert response.status == 418 assert response.status == 418
@ -323,9 +329,15 @@ def test_contextual_exception_extra(debug):
def fail(): def fail():
raise TeapotError(extra={"foo": "bar"}) raise TeapotError(extra={"foo": "bar"})
app.post("/coffee/json", error_format="json")(lambda _: fail()) app.post("/coffee/json", error_format="json", name="json")(
app.post("/coffee/html", error_format="html")(lambda _: fail()) lambda _: fail()
app.post("/coffee/text", error_format="text")(lambda _: fail()) )
app.post("/coffee/html", error_format="html", name="html")(
lambda _: fail()
)
app.post("/coffee/text", error_format="text", name="text")(
lambda _: fail()
)
_, response = app.test_client.post("/coffee/json", debug=debug) _, response = app.test_client.post("/coffee/json", debug=debug)
assert response.status == 418 assert response.status == 418

View File

@ -266,20 +266,17 @@ def test_exception_handler_response_was_sent(
assert "Error" in response.text assert "Error" in response.text
def test_warn_on_duplicate( def test_errir_on_duplicate(app: Sanic):
app: Sanic, caplog: LogCaptureFixture, recwarn: WarningsRecorder
):
@app.exception(ServerError) @app.exception(ServerError)
async def exception_handler_1(request, exception): async def exception_handler_1(request, exception):
... ...
@app.exception(ServerError) message = (
async def exception_handler_2(request, exception):
...
assert len(caplog.records) == 1
assert len(recwarn) == 1
assert caplog.records[0].message == (
"Duplicate exception handler definition on: route=__ALL_ROUTES__ and " "Duplicate exception handler definition on: route=__ALL_ROUTES__ and "
"exception=<class 'sanic.exceptions.ServerError'>" "exception=<class 'sanic.exceptions.ServerError'>"
) )
with pytest.raises(ServerError, match=message):
@app.exception(ServerError)
async def exception_handler_2(request, exception):
...

View File

@ -98,3 +98,17 @@ def test_transfer_chunked(client):
data = stdjson.loads(body) data = stdjson.loads(body)
assert data == ["foo", "bar"] assert data == ["foo", "bar"]
def test_url_encoding(client):
client.send(
"""
GET /invalid\xA0url HTTP/1.1
"""
)
response = client.recv()
headers, body = response.rsplit(b"\r\n\r\n", 1)
assert b"400 Bad Request" in headers
assert b"URL may only contain US-ASCII characters." in body

View File

@ -49,96 +49,6 @@ def test_multiprocessing(app):
assert len(process_list) == num_workers + 1 assert len(process_list) == num_workers + 1
@pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def test_multiprocessing_legacy(app):
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(2)
app.run(HOST, 4121, workers=num_workers, debug=True, legacy=True)
assert len(process_list) == num_workers
@pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def test_multiprocessing_legacy_sock(app):
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(2)
sock = configure_socket(
{
"host": HOST,
"port": 4121,
"unix": None,
"backlog": 100,
}
)
app.run(workers=num_workers, debug=True, legacy=True, sock=sock)
sock.close()
assert len(process_list) == num_workers
@pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def test_multiprocessing_legacy_unix(app):
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(2)
app.run(workers=num_workers, debug=True, legacy=True, unix="./test.sock")
assert len(process_list) == num_workers
@pytest.mark.skipif( @pytest.mark.skipif(
not hasattr(signal, "SIGALRM"), not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform", reason="SIGALRM is not implemented for this platform",

View File

@ -1,3 +1,5 @@
import uuid
from unittest.mock import Mock from unittest.mock import Mock
from uuid import UUID, uuid4 from uuid import UUID, uuid4
@ -5,7 +7,7 @@ import pytest
from sanic import Sanic, response from sanic import Sanic, response
from sanic.exceptions import BadURL, SanicException from sanic.exceptions import BadURL, SanicException
from sanic.request import Request, uuid from sanic.request import Request
from sanic.server import HttpProtocol from sanic.server import HttpProtocol

View File

@ -16,8 +16,9 @@ from sanic_testing.testing import (
) )
from sanic import Blueprint, Sanic from sanic import Blueprint, Sanic
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import ServerError from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters from sanic.request import RequestParameters
from sanic.response import html, json, text from sanic.response import html, json, text
@ -104,11 +105,11 @@ def test_html(app):
return html("<h1>Hello</h1>") return html("<h1>Hello</h1>")
@app.route("/foo") @app.route("/foo")
async def handler(request): async def handler_foo(request):
return html(Foo()) return html(Foo())
@app.route("/bar") @app.route("/bar")
async def handler(request): async def handler_bar(request):
return html(Bar()) return html(Bar())
request, response = app.test_client.get("/") request, response = app.test_client.get("/")
@ -1813,8 +1814,8 @@ def test_request_cookies(app):
request, response = app.test_client.get("/", cookies=cookies) request, response = app.test_client.get("/", cookies=cookies)
assert request.cookies == cookies assert len(request.cookies) == len(cookies)
assert request.cookies == cookies # For request._cookies assert request.cookies["test"] == cookies["test"]
@pytest.mark.asyncio @pytest.mark.asyncio
@ -1827,8 +1828,8 @@ async def test_request_cookies_asgi(app):
request, response = await app.asgi_client.get("/", cookies=cookies) request, response = await app.asgi_client.get("/", cookies=cookies)
assert request.cookies == cookies assert len(request.cookies) == len(cookies)
assert request.cookies == cookies # For request._cookies assert request.cookies["test"] == cookies["test"]
def test_request_cookies_without_cookies(app): def test_request_cookies_without_cookies(app):
@ -2198,10 +2199,25 @@ def test_safe_method_with_body(app):
assert response.body == b"OK" assert response.body == b"OK"
def test_conflicting_body_methods_overload(app): @pytest.mark.asyncio
async def test_conflicting_body_methods_overload_error(app: Sanic):
@app.put("/") @app.put("/")
@app.put("/p/") @app.put("/p/")
@app.put("/p/<foo>") @app.put("/p/<foo>")
async def put(request, foo=None):
...
with pytest.raises(
ServerError,
match="Duplicate route names detected: test_conflicting_body_methods_overload_error\.put.*",
):
await app._startup()
def test_conflicting_body_methods_overload(app: Sanic):
@app.put("/", name="one")
@app.put("/p/", name="two")
@app.put("/p/<foo>", name="three")
async def put(request, foo=None): async def put(request, foo=None):
return json( return json(
{"name": request.route.name, "body": str(request.body), "foo": foo} {"name": request.route.name, "body": str(request.body), "foo": foo}
@ -2219,21 +2235,21 @@ def test_conflicting_body_methods_overload(app):
_, response = app.test_client.put("/", json=payload) _, response = app.test_client.put("/", json=payload)
assert response.status == 200 assert response.status == 200
assert response.json == { assert response.json == {
"name": "test_conflicting_body_methods_overload.put", "name": "test_conflicting_body_methods_overload.one",
"foo": None, "foo": None,
"body": data, "body": data,
} }
_, response = app.test_client.put("/p", json=payload) _, response = app.test_client.put("/p", json=payload)
assert response.status == 200 assert response.status == 200
assert response.json == { assert response.json == {
"name": "test_conflicting_body_methods_overload.put", "name": "test_conflicting_body_methods_overload.two",
"foo": None, "foo": None,
"body": data, "body": data,
} }
_, response = app.test_client.put("/p/test", json=payload) _, response = app.test_client.put("/p/test", json=payload)
assert response.status == 200 assert response.status == 200
assert response.json == { assert response.json == {
"name": "test_conflicting_body_methods_overload.put", "name": "test_conflicting_body_methods_overload.three",
"foo": "test", "foo": "test",
"body": data, "body": data,
} }
@ -2246,9 +2262,26 @@ def test_conflicting_body_methods_overload(app):
} }
def test_handler_overload(app): @pytest.mark.asyncio
async def test_handler_overload_error(app: Sanic):
@app.get("/long/sub/route/param_a/<param_a:str>/param_b/<param_b:str>") @app.get("/long/sub/route/param_a/<param_a:str>/param_b/<param_b:str>")
@app.post("/long/sub/route/") @app.post("/long/sub/route/")
def handler(request, **kwargs):
...
with pytest.raises(
ServerError,
match="Duplicate route names detected: test_handler_overload_error\.handler.*",
):
await app._startup()
def test_handler_overload(app: Sanic):
@app.get(
"/long/sub/route/param_a/<param_a:str>/param_b/<param_b:str>",
name="one",
)
@app.post("/long/sub/route/", name="two")
def handler(request, **kwargs): def handler(request, **kwargs):
return json(kwargs) return json(kwargs)

View File

@ -12,7 +12,7 @@ from sanic_testing.testing import SanicTestClient
from sanic import Blueprint, Sanic from sanic import Blueprint, Sanic
from sanic.constants import HTTP_METHODS from sanic.constants import HTTP_METHODS
from sanic.exceptions import NotFound, SanicException from sanic.exceptions import NotFound, SanicException, ServerError
from sanic.request import Request from sanic.request import Request
from sanic.response import empty, json, text from sanic.response import empty, json, text
@ -744,8 +744,8 @@ def test_route_duplicate(app):
def test_double_stack_route(app): def test_double_stack_route(app):
@app.route("/test/1") @app.route("/test/1", name="test1")
@app.route("/test/2") @app.route("/test/2", name="test2")
async def handler1(request): async def handler1(request):
return text("OK") return text("OK")
@ -759,8 +759,8 @@ def test_double_stack_route(app):
async def test_websocket_route_asgi(app): async def test_websocket_route_asgi(app):
ev = asyncio.Event() ev = asyncio.Event()
@app.websocket("/test/1") @app.websocket("/test/1", name="test1")
@app.websocket("/test/2") @app.websocket("/test/2", name="test2")
async def handler(request, ws): async def handler(request, ws):
ev.set() ev.set()
@ -1279,7 +1279,7 @@ async def test_added_callable_route_ctx_kwargs(app):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_duplicate_route_deprecation(app): async def test_duplicate_route_error(app):
@app.route("/foo", name="duped") @app.route("/foo", name="duped")
async def handler_foo(request): async def handler_foo(request):
return text("...") return text("...")
@ -1289,9 +1289,7 @@ async def test_duplicate_route_deprecation(app):
return text("...") return text("...")
message = ( message = (
r"\[DEPRECATION v23\.3\] Duplicate route names detected: " "Duplicate route names detected: test_duplicate_route_error.duped."
r"test_duplicate_route_deprecation\.duped\. In the future, "
r"Sanic will enforce uniqueness in route naming\."
) )
with pytest.warns(DeprecationWarning, match=message): with pytest.raises(ServerError, match=message):
await app._startup() await app._startup()

View File

@ -66,8 +66,8 @@ def test_no_register_system_signals_fails(app):
app.listener("after_server_stop")(after) app.listener("after_server_stop")(after)
message = ( message = (
"Cannot run Sanic.serve with register_sys_signals=False. Use " r"Cannot run Sanic\.serve with register_sys_signals=False\. Use "
"either Sanic.serve_single or Sanic.serve_legacy." r"Sanic.serve_single\."
) )
with pytest.raises(RuntimeError, match=message): with pytest.raises(RuntimeError, match=message):
app.prepare(HOST, PORT, register_sys_signals=False) app.prepare(HOST, PORT, register_sys_signals=False)

View File

@ -9,7 +9,7 @@ from time import gmtime, strftime
import pytest import pytest
from sanic import Sanic, text from sanic import Sanic, text
from sanic.exceptions import FileNotFound from sanic.exceptions import FileNotFound, ServerError
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -108,14 +108,9 @@ def test_static_file_pathlib(app, static_file_directory, file_name):
def test_static_file_bytes(app, static_file_directory, file_name): def test_static_file_bytes(app, static_file_directory, file_name):
bsep = os.path.sep.encode("utf-8") bsep = os.path.sep.encode("utf-8")
file_path = static_file_directory.encode("utf-8") + bsep + file_name file_path = static_file_directory.encode("utf-8") + bsep + file_name
message = ( message = "Static file or directory must be a path-like object or string"
"Serving a static directory with a bytes " with pytest.raises(TypeError, match=message):
"string is deprecated and will be removed in v22.9."
)
with pytest.warns(DeprecationWarning, match=message):
app.static("/testing.file", file_path) app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -523,10 +518,26 @@ def test_no_stack_trace_on_not_found(app, static_file_directory, caplog):
assert response.text == "No file: /static/non_existing_file.file" assert response.text == "No file: /static/non_existing_file.file"
def test_multiple_statics(app, static_file_directory): @pytest.mark.asyncio
async def test_multiple_statics_error(app, static_file_directory):
app.static("/file", get_file_path(static_file_directory, "test.file")) app.static("/file", get_file_path(static_file_directory, "test.file"))
app.static("/png", get_file_path(static_file_directory, "python.png")) app.static("/png", get_file_path(static_file_directory, "python.png"))
message = (
r"Duplicate route names detected: test_multiple_statics_error\.static"
)
with pytest.raises(ServerError, match=message):
await app._startup()
def test_multiple_statics(app, static_file_directory):
app.static(
"/file", get_file_path(static_file_directory, "test.file"), name="file"
)
app.static(
"/png", get_file_path(static_file_directory, "python.png"), name="png"
)
_, response = app.test_client.get("/file") _, response = app.test_client.get("/file")
assert response.status == 200 assert response.status == 200
assert response.body == get_file_content( assert response.body == get_file_content(
@ -540,10 +551,22 @@ def test_multiple_statics(app, static_file_directory):
) )
def test_resource_type_default(app, static_file_directory): @pytest.mark.asyncio
async def test_resource_type_default_error(app, static_file_directory):
app.static("/static", static_file_directory) app.static("/static", static_file_directory)
app.static("/file", get_file_path(static_file_directory, "test.file")) app.static("/file", get_file_path(static_file_directory, "test.file"))
message = r"Duplicate route names detected: test_resource_type_default_error\.static"
with pytest.raises(ServerError, match=message):
await app._startup()
def test_resource_type_default(app, static_file_directory):
app.static("/static", static_file_directory, name="static")
app.static(
"/file", get_file_path(static_file_directory, "test.file"), name="file"
)
_, response = app.test_client.get("/static") _, response = app.test_client.get("/static")
assert response.status == 404 assert response.status == 404

View File

@ -52,34 +52,23 @@ def test_cwd_in_path():
def test_input_is_dir(): def test_input_is_dir():
loader = AppLoader(str(STATIC)) loader = AppLoader(str(STATIC))
message = ( app = loader.load()
"App not found.\n Please use --simple if you are passing a " assert isinstance(app, Sanic)
f"directory to sanic.\n eg. sanic {str(STATIC)} --simple"
)
with pytest.raises(ValueError, match=message):
loader.load()
def test_input_is_factory(): def test_input_is_factory():
ns = SimpleNamespace(module="foo") ns = SimpleNamespace(target="foo")
loader = AppLoader("tests.fake.server:create_app", args=ns) loader = AppLoader("tests.fake.server:create_app", args=ns)
message = ( app = loader.load()
"Module is not a Sanic app, it is a function\n If this callable " assert isinstance(app, Sanic)
"returns a Sanic instance try: \nsanic foo --factory"
)
with pytest.raises(ValueError, match=message):
loader.load()
def test_input_is_module(): def test_input_is_module():
ns = SimpleNamespace(module="foo") ns = SimpleNamespace(target="foo")
loader = AppLoader("tests.fake.server", args=ns) loader = AppLoader("tests.fake.server", args=ns)
message = (
"Module is not a Sanic app, it is a module\n " app = loader.load()
"Perhaps you meant foo:app?" assert isinstance(app, Sanic)
)
with pytest.raises(ValueError, match=message):
loader.load()
@pytest.mark.parametrize("creator", ("mkcert", "trustme")) @pytest.mark.parametrize("creator", ("mkcert", "trustme"))

View File

@ -72,24 +72,6 @@ def test_not_have_multiplexer_single(app: Sanic):
assert not event.is_set() assert not event.is_set()
def test_not_have_multiplexer_legacy(app: Sanic):
event = Event()
@app.main_process_start
async def setup(app, _):
app.shared_ctx.event = event
@app.after_server_start
def stop(app):
if hasattr(app, "m") and isinstance(app.m, WorkerMultiplexer):
app.shared_ctx.event.set()
app.stop()
app.run(legacy=True)
assert not event.is_set()
def test_ack(worker_state: Dict[str, Any], m: WorkerMultiplexer): def test_ack(worker_state: Dict[str, Any], m: WorkerMultiplexer):
worker_state["Test"] = {"foo": "bar"} worker_state["Test"] = {"foo": "bar"}
m.ack() m.ack()