ruff-only #1

Open
leo wants to merge 17 commits from ruff-only into main
81 changed files with 696 additions and 793 deletions
Showing only changes of commit 65ba1942cc - Show all commits

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Sanic documentation build configuration file, created by
# sphinx-quickstart on Sun Dec 25 18:07:21 2016.

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
import asyncio

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
from functools import wraps

View File

@ -48,7 +48,7 @@ app = Sanic("Example")
@app.middleware
def log_request(request: Request):
logdna.info("I was Here with a new Request to URL: {}".format(request.url))
logdna.info(f"I was Here with a new Request to URL: {request.url}")
@app.route("/")

View File

@ -20,7 +20,7 @@ def test_sync(request):
@app.route("/dynamic/<name>/<i:int>")
def test_params(request, name, i):
return response.text("yeehaww {} {}".format(name, i))
return response.text(f"yeehaww {name} {i}")
@app.route("/exception")

View File

@ -14,7 +14,7 @@ async def index(request):
@app.route("/posts/<post_id>")
async def post_handler(request, post_id):
return response.text("Post - {}".format(post_id))
return response.text(f"Post - {post_id}")
if __name__ == "__main__":

View File

@ -2,7 +2,6 @@ from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Type
from frontmatter import parse
@ -15,7 +14,7 @@ from .docobject import organize_docobjects
_PAGE_CACHE: dict[
str, dict[str, tuple[Page | None, Page | None, Page | None]]
] = {}
_LAYOUTS_CACHE: dict[str, Type[BaseLayout]] = {
_LAYOUTS_CACHE: dict[str, type[BaseLayout]] = {
"home": HomeLayout,
"main": MainLayout,
}
@ -43,7 +42,7 @@ class Page:
DEFAULT_LANGUAGE = _DEFAULT
def get_layout(self) -> Type[BaseLayout]:
def get_layout(self) -> type[BaseLayout]:
return _LAYOUTS_CACHE[self.meta.layout]
@property

View File

@ -1,7 +1,6 @@
from __future__ import annotations
from contextlib import contextmanager
from typing import Type
from html5tagger import HTML, Builder # type: ignore
from sanic import Request
@ -38,7 +37,7 @@ class PageRenderer(BaseRenderer):
@contextmanager
def _base(self, request: Request, builder: Builder, page: Page | None):
layout_type: Type[BaseLayout] = (
layout_type: type[BaseLayout] = (
page.get_layout() if page else BaseLayout
)
layout = layout_type(builder)

View File

@ -3,7 +3,8 @@ requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.ruff]
extend-select = ["I", "W"]
extend-select = ["I", "W", "UP", "C4"]
# Worth selecting but still too broken: ASYNC, S, B
ignore = ["D100", "D101", "D102", "D103", "E402", "E741", "F811", "F821"]
line-length = 79
show-source = true

View File

@ -32,19 +32,12 @@ from typing import (
Callable,
ClassVar,
Coroutine,
Deque,
Dict,
Generic,
Iterable,
Iterator,
List,
Literal,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)
@ -173,7 +166,7 @@ class Sanic(
"websocket_tasks",
)
_app_registry: ClassVar[Dict[str, "Sanic"]] = {}
_app_registry: ClassVar[dict[str, Sanic]] = {}
test_mode: ClassVar[bool] = False
@overload
@ -182,19 +175,19 @@ class Sanic(
name: str,
config: None = None,
ctx: None = None,
router: Optional[Router] = None,
signal_router: Optional[SignalRouter] = None,
error_handler: Optional[ErrorHandler] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
request_class: Optional[Type[Request]] = None,
router: Router | None = None,
signal_router: SignalRouter | None = None,
error_handler: ErrorHandler | None = None,
env_prefix: str | None = SANIC_PREFIX,
request_class: type[Request] | None = None,
strict_slashes: bool = False,
log_config: Optional[Dict[str, Any]] = None,
log_config: dict[str, Any] | None = None,
configure_logging: bool = True,
dumps: Optional[Callable[..., AnyStr]] = None,
loads: Optional[Callable[..., Any]] = None,
dumps: Callable[..., AnyStr] | None = None,
loads: Callable[..., Any] | None = None,
inspector: bool = False,
inspector_class: Optional[Type[Inspector]] = None,
certloader_class: Optional[Type[CertLoader]] = None,
inspector_class: type[Inspector] | None = None,
certloader_class: type[CertLoader] | None = None,
) -> None:
...
@ -202,21 +195,21 @@ class Sanic(
def __init__(
self: Sanic[config_type, SimpleNamespace],
name: str,
config: Optional[config_type] = None,
config: config_type | None = None,
ctx: None = None,
router: Optional[Router] = None,
signal_router: Optional[SignalRouter] = None,
error_handler: Optional[ErrorHandler] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
request_class: Optional[Type[Request]] = None,
router: Router | None = None,
signal_router: SignalRouter | None = None,
error_handler: ErrorHandler | None = None,
env_prefix: str | None = SANIC_PREFIX,
request_class: type[Request] | None = None,
strict_slashes: bool = False,
log_config: Optional[Dict[str, Any]] = None,
log_config: dict[str, Any] | None = None,
configure_logging: bool = True,
dumps: Optional[Callable[..., AnyStr]] = None,
loads: Optional[Callable[..., Any]] = None,
dumps: Callable[..., AnyStr] | None = None,
loads: Callable[..., Any] | None = None,
inspector: bool = False,
inspector_class: Optional[Type[Inspector]] = None,
certloader_class: Optional[Type[CertLoader]] = None,
inspector_class: type[Inspector] | None = None,
certloader_class: type[CertLoader] | None = None,
) -> None:
...
@ -225,20 +218,20 @@ class Sanic(
self: Sanic[Config, ctx_type],
name: str,
config: None = None,
ctx: Optional[ctx_type] = None,
router: Optional[Router] = None,
signal_router: Optional[SignalRouter] = None,
error_handler: Optional[ErrorHandler] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
request_class: Optional[Type[Request]] = None,
ctx: ctx_type | None = None,
router: Router | None = None,
signal_router: SignalRouter | None = None,
error_handler: ErrorHandler | None = None,
env_prefix: str | None = SANIC_PREFIX,
request_class: type[Request] | None = None,
strict_slashes: bool = False,
log_config: Optional[Dict[str, Any]] = None,
log_config: dict[str, Any] | None = None,
configure_logging: bool = True,
dumps: Optional[Callable[..., AnyStr]] = None,
loads: Optional[Callable[..., Any]] = None,
dumps: Callable[..., AnyStr] | None = None,
loads: Callable[..., Any] | None = None,
inspector: bool = False,
inspector_class: Optional[Type[Inspector]] = None,
certloader_class: Optional[Type[CertLoader]] = None,
inspector_class: type[Inspector] | None = None,
certloader_class: type[CertLoader] | None = None,
) -> None:
...
@ -246,42 +239,42 @@ class Sanic(
def __init__(
self: Sanic[config_type, ctx_type],
name: str,
config: Optional[config_type] = None,
ctx: Optional[ctx_type] = None,
router: Optional[Router] = None,
signal_router: Optional[SignalRouter] = None,
error_handler: Optional[ErrorHandler] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
request_class: Optional[Type[Request]] = None,
config: config_type | None = None,
ctx: ctx_type | None = None,
router: Router | None = None,
signal_router: SignalRouter | None = None,
error_handler: ErrorHandler | None = None,
env_prefix: str | None = SANIC_PREFIX,
request_class: type[Request] | None = None,
strict_slashes: bool = False,
log_config: Optional[Dict[str, Any]] = None,
log_config: dict[str, Any] | None = None,
configure_logging: bool = True,
dumps: Optional[Callable[..., AnyStr]] = None,
loads: Optional[Callable[..., Any]] = None,
dumps: Callable[..., AnyStr] | None = None,
loads: Callable[..., Any] | None = None,
inspector: bool = False,
inspector_class: Optional[Type[Inspector]] = None,
certloader_class: Optional[Type[CertLoader]] = None,
inspector_class: type[Inspector] | None = None,
certloader_class: type[CertLoader] | None = None,
) -> None:
...
def __init__(
self,
name: str,
config: Optional[config_type] = None,
ctx: Optional[ctx_type] = None,
router: Optional[Router] = None,
signal_router: Optional[SignalRouter] = None,
error_handler: Optional[ErrorHandler] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
request_class: Optional[Type[Request]] = None,
config: config_type | None = None,
ctx: ctx_type | None = None,
router: Router | None = None,
signal_router: SignalRouter | None = None,
error_handler: ErrorHandler | None = None,
env_prefix: str | None = SANIC_PREFIX,
request_class: type[Request] | None = None,
strict_slashes: bool = False,
log_config: Optional[Dict[str, Any]] = None,
log_config: dict[str, Any] | None = None,
configure_logging: bool = True,
dumps: Optional[Callable[..., AnyStr]] = None,
loads: Optional[Callable[..., Any]] = None,
dumps: Callable[..., AnyStr] | None = None,
loads: Callable[..., Any] | None = None,
inspector: bool = False,
inspector_class: Optional[Type[Inspector]] = None,
certloader_class: Optional[Type[CertLoader]] = None,
inspector_class: type[Inspector] | None = None,
certloader_class: type[CertLoader] | None = None,
) -> None:
super().__init__(name=name)
# logging
@ -303,41 +296,41 @@ class Sanic(
self.config.INSPECTOR = inspector
# Then we can do the rest
self._asgi_app: Optional[ASGIApp] = None
self._asgi_lifespan: Optional[Lifespan] = None
self._asgi_app: ASGIApp | None = None
self._asgi_lifespan: Lifespan | None = None
self._asgi_client: Any = None
self._blueprint_order: List[Blueprint] = []
self._delayed_tasks: List[str] = []
self._blueprint_order: list[Blueprint] = []
self._delayed_tasks: list[str] = []
self._future_registry: FutureRegistry = FutureRegistry()
self._inspector: Optional[Inspector] = None
self._manager: Optional[WorkerManager] = None
self._inspector: Inspector | None = None
self._manager: WorkerManager | None = None
self._state: ApplicationState = ApplicationState(app=self)
self._task_registry: Dict[str, Union[Task, None]] = {}
self._task_registry: dict[str, Task | None] = {}
self._test_client: Any = None
self._test_manager: Any = None
self.asgi = False
self.auto_reload = False
self.blueprints: Dict[str, Blueprint] = {}
self.certloader_class: Type[CertLoader] = (
self.blueprints: dict[str, Blueprint] = {}
self.certloader_class: type[CertLoader] = (
certloader_class or CertLoader
)
self.configure_logging: bool = configure_logging
self.ctx: ctx_type = cast(ctx_type, ctx or SimpleNamespace())
self.error_handler: ErrorHandler = error_handler or ErrorHandler()
self.inspector_class: Type[Inspector] = inspector_class or Inspector
self.listeners: Dict[str, List[ListenerType[Any]]] = defaultdict(list)
self.named_request_middleware: Dict[str, Deque[Middleware]] = {}
self.named_response_middleware: Dict[str, Deque[Middleware]] = {}
self.request_class: Type[Request] = request_class or Request
self.request_middleware: Deque[Middleware] = deque()
self.response_middleware: Deque[Middleware] = deque()
self.inspector_class: type[Inspector] = inspector_class or Inspector
self.listeners: dict[str, list[ListenerType[Any]]] = defaultdict(list)
self.named_request_middleware: dict[str, deque[Middleware]] = {}
self.named_response_middleware: dict[str, deque[Middleware]] = {}
self.request_class: type[Request] = request_class or Request
self.request_middleware: deque[Middleware] = deque()
self.response_middleware: deque[Middleware] = deque()
self.router: Router = router or Router()
self.shared_ctx: SharedContext = SharedContext()
self.signal_router: SignalRouter = signal_router or SignalRouter()
self.sock: Optional[socket] = None
self.sock: socket | None = None
self.strict_slashes: bool = strict_slashes
self.websocket_enabled: bool = False
self.websocket_tasks: Set[Future[Any]] = set()
self.websocket_tasks: set[Future[Any]] = set()
# Register alternative method names
self.go_fast = self.run
@ -397,7 +390,7 @@ class Sanic(
_event = ListenerEvent[event.upper()]
except (ValueError, AttributeError):
valid = ", ".join(
map(lambda x: x.lower(), ListenerEvent.__members__.keys())
x.lower() for x in ListenerEvent.__members__.keys()
)
raise BadRequest(f"Invalid event: {event}. Use one of: {valid}")
@ -412,11 +405,11 @@ class Sanic(
def register_middleware(
self,
middleware: Union[MiddlewareType, Middleware],
middleware: MiddlewareType | Middleware,
attach_to: str = "request",
*,
priority: Union[Default, int] = _default,
) -> Union[MiddlewareType, Middleware]:
priority: Default | int = _default,
) -> MiddlewareType | Middleware:
"""Register a middleware to be called before a request is handled.
Args:
@ -461,7 +454,7 @@ class Sanic(
route_names: Iterable[str],
attach_to: str = "request",
*,
priority: Union[Default, int] = _default,
priority: Default | int = _default,
):
"""Used to register named middleqare (middleware typically on blueprints)
@ -512,7 +505,7 @@ class Sanic(
def _apply_exception_handler(
self,
handler: FutureException,
route_names: Optional[List[str]] = None,
route_names: list[str] | None = None,
):
"""Decorate a function to be registered as a handler for exceptions
@ -533,7 +526,7 @@ class Sanic(
def _apply_route(
self, route: FutureRoute, overwrite: bool = False
) -> List[Route]:
) -> list[Route]:
params = route._asdict()
params["overwrite"] = overwrite
websocket = params.pop("websocket", False)
@ -567,7 +560,7 @@ class Sanic(
def _apply_middleware(
self,
middleware: FutureMiddleware,
route_names: Optional[List[str]] = None,
route_names: list[str] | None = None,
):
with self.amend():
if route_names:
@ -588,8 +581,8 @@ class Sanic(
self,
event: str,
*,
condition: Optional[Dict[str, str]] = None,
context: Optional[Dict[str, Any]] = None,
condition: dict[str, str] | None = None,
context: dict[str, Any] | None = None,
fail_not_found: bool = True,
inline: Literal[True],
reverse: bool = False,
@ -601,8 +594,8 @@ class Sanic(
self,
event: str,
*,
condition: Optional[Dict[str, str]] = None,
context: Optional[Dict[str, Any]] = None,
condition: dict[str, str] | None = None,
context: dict[str, Any] | None = None,
fail_not_found: bool = True,
inline: Literal[False] = False,
reverse: bool = False,
@ -613,12 +606,12 @@ class Sanic(
self,
event: str,
*,
condition: Optional[Dict[str, str]] = None,
context: Optional[Dict[str, Any]] = None,
condition: dict[str, str] | None = None,
context: dict[str, Any] | None = None,
fail_not_found: bool = True,
inline: bool = False,
reverse: bool = False,
) -> Coroutine[Any, Any, Awaitable[Union[Task, Any]]]:
) -> Coroutine[Any, Any, Awaitable[Task | Any]]:
"""Dispatches an event to the signal router.
Args:
@ -663,7 +656,7 @@ class Sanic(
)
async def event(
self, event: str, timeout: Optional[Union[int, float]] = None
self, event: str, timeout: int | float | None = None
) -> None:
"""Wait for a specific event to be triggered.
@ -780,13 +773,13 @@ class Sanic(
def blueprint(
self,
blueprint: Union[Blueprint, Iterable[Blueprint], BlueprintGroup],
blueprint: Blueprint | (Iterable[Blueprint] | BlueprintGroup),
*,
url_prefix: Optional[str] = None,
version: Optional[Union[int, float, str]] = None,
strict_slashes: Optional[bool] = None,
version_prefix: Optional[str] = None,
name_prefix: Optional[str] = None,
url_prefix: str | None = None,
version: int | (float | str) | None = None,
strict_slashes: bool | None = None,
version_prefix: str | None = None,
name_prefix: str | None = None,
) -> None:
"""Register a blueprint on the application.
@ -812,7 +805,7 @@ class Sanic(
app.blueprint(bp, url_prefix='/blueprint')
```
""" # noqa: E501
options: Dict[str, Any] = {}
options: dict[str, Any] = {}
if url_prefix is not None:
options["url_prefix"] = url_prefix
if version is not None:
@ -825,7 +818,7 @@ class Sanic(
options["name_prefix"] = name_prefix
if isinstance(blueprint, (Iterable, BlueprintGroup)):
for item in blueprint:
params: Dict[str, Any] = {**options}
params: dict[str, Any] = {**options}
if isinstance(blueprint, BlueprintGroup):
merge_from = [
options.get("url_prefix", ""),
@ -857,8 +850,8 @@ class Sanic(
return
if blueprint.name in self.blueprints:
assert self.blueprints[blueprint.name] is blueprint, (
'A blueprint with the name "%s" is already registered. '
"Blueprint names must be unique." % (blueprint.name,)
'A blueprint with the name "{}" is already registered. '
"Blueprint names must be unique.".format(blueprint.name)
)
else:
self.blueprints[blueprint.name] = blueprint
@ -923,7 +916,7 @@ class Sanic(
# http://subdomain.example.com/view-name
""" # noqa: E501
# find the route by the supplied view name
kw: Dict[str, str] = {}
kw: dict[str, str] = {}
# special static files url_for
if "." not in view_name:
@ -1221,13 +1214,7 @@ class Sanic(
# Define `response` var here to remove warnings about
# allocation before assignment below.
response: Optional[
Union[
BaseHTTPResponse,
Coroutine[Any, Any, Optional[BaseHTTPResponse]],
ResponseStream,
]
] = None
response: BaseHTTPResponse | (Coroutine[Any, Any, BaseHTTPResponse | None] | ResponseStream) | None = None
run_middleware = True
try:
await self.dispatch(
@ -1285,10 +1272,10 @@ class Sanic(
if handler is None:
raise ServerError(
(
"'None' was returned while requesting a "
"handler from the router"
)
)
# Run response handler
@ -1566,7 +1553,7 @@ class Sanic(
app,
loop,
*,
name: Optional[str] = None,
name: str | None = None,
register: bool = True,
) -> Task:
if not isinstance(task, Future):
@ -1628,11 +1615,11 @@ class Sanic(
def add_task(
self,
task: Union[Future[Any], Coroutine[Any, Any, Any], Awaitable[Any]],
task: Future[Any] | (Coroutine[Any, Any, Any] | Awaitable[Any]),
*,
name: Optional[str] = None,
name: str | None = None,
register: bool = True,
) -> Optional[Task[Any]]:
) -> Task[Any] | None:
"""Schedule a task to run later, after the loop has started.
While this is somewhat similar to `asyncio.create_task`, it can be
@ -1681,16 +1668,16 @@ class Sanic(
@overload
def get_task(
self, name: str, *, raise_exception: Literal[False]
) -> Optional[Task]:
) -> Task | None:
...
@overload
def get_task(self, name: str, *, raise_exception: bool) -> Optional[Task]:
def get_task(self, name: str, *, raise_exception: bool) -> Task | None:
...
def get_task(
self, name: str, *, raise_exception: bool = True
) -> Optional[Task]:
) -> Task | None:
"""Get a named task.
This method is used to get a task by its name. Optionally, you can
@ -1716,7 +1703,7 @@ class Sanic(
async def cancel_task(
self,
name: str,
msg: Optional[str] = None,
msg: str | None = None,
*,
raise_exception: bool = True,
) -> None:
@ -1751,7 +1738,7 @@ class Sanic(
""" # noqa: E501
task = self.get_task(name, raise_exception=raise_exception)
if task and not task.cancelled():
args: Tuple[str, ...] = ()
args: tuple[str, ...] = ()
if msg:
if sys.version_info >= (3, 9):
args = (msg,)
@ -1784,7 +1771,7 @@ class Sanic(
}
def shutdown_tasks(
self, timeout: Optional[float] = None, increment: float = 0.1
self, timeout: float | None = None, increment: float = 0.1
) -> None:
"""Cancel all tasks except the server task.
@ -1853,7 +1840,7 @@ class Sanic(
# Configuration
# -------------------------------------------------------------------- #
def update_config(self, config: Union[bytes, str, dict, Any]) -> None:
def update_config(self, config: bytes | (str | (dict | Any))) -> None:
"""Update the application configuration.
This method is used to update the application configuration. It can
@ -1903,7 +1890,7 @@ class Sanic(
return self._state
@property
def reload_dirs(self) -> Set[Path]:
def reload_dirs(self) -> set[Path]:
"""The directories that are monitored for auto-reload.
Returns:
@ -1948,9 +1935,9 @@ class Sanic(
def extend(
self,
*,
extensions: Optional[List[Type[Extension]]] = None,
extensions: list[type[Extension]] | None = None,
built_in_extensions: bool = True,
config: Optional[Union[Config, Dict[str, Any]]] = None,
config: Config | dict[str, Any] | None = None,
**kwargs,
) -> Extend:
"""Extend Sanic with additional functionality using Sanic Extensions.
@ -2069,7 +2056,7 @@ class Sanic(
@classmethod
def get_app(
cls, name: Optional[str] = None, *, force_create: bool = False
cls, name: str | None = None, *, force_create: bool = False
) -> Sanic:
"""Retrieve an instantiated Sanic instance by name.
@ -2316,7 +2303,7 @@ class Sanic(
self,
concern: str,
action: str,
loop: Optional[AbstractEventLoop] = None,
loop: AbstractEventLoop | None = None,
) -> None:
event = f"server.{concern}.{action}"
if action not in ("before", "after") or concern not in (
@ -2347,7 +2334,7 @@ class Sanic(
def refresh(
self,
passthru: Optional[Dict[str, Any]] = None,
passthru: dict[str, Any] | None = None,
) -> Sanic:
"""Refresh the application instance. **This is used internally by Sanic**.

View File

@ -47,8 +47,7 @@ class Spinner: # noqa
@staticmethod
def cursor():
while True:
for cursor in "|/-\\":
yield cursor
yield from "|/-\\"
@staticmethod
def hide():

View File

@ -6,7 +6,7 @@ from dataclasses import dataclass, field
from pathlib import Path
from socket import socket
from ssl import SSLContext
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union
from typing import TYPE_CHECKING, Any
from sanic.application.constants import Mode, Server, ServerStage
from sanic.log import VerbosityFilter, logger
@ -21,9 +21,9 @@ if TYPE_CHECKING:
class ApplicationServerInfo:
"""Information about a server instance."""
settings: Dict[str, Any]
settings: dict[str, Any]
stage: ServerStage = field(default=ServerStage.STOPPED)
server: Optional[AsyncioServer] = field(default=None)
server: AsyncioServer | None = field(default=None)
@dataclass
@ -40,11 +40,11 @@ class ApplicationState:
fast: bool = field(default=False)
host: str = field(default="")
port: int = field(default=0)
ssl: Optional[SSLContext] = field(default=None)
sock: Optional[socket] = field(default=None)
unix: Optional[str] = field(default=None)
ssl: SSLContext | None = field(default=None)
sock: socket | None = field(default=None)
unix: str | None = field(default=None)
mode: Mode = field(default=Mode.PRODUCTION)
reload_dirs: Set[Path] = field(default_factory=set)
reload_dirs: set[Path] = field(default_factory=set)
auto_reload: bool = field(default=False)
server: Server = field(default=Server.SANIC)
is_running: bool = field(default=False)
@ -53,7 +53,7 @@ class ApplicationState:
verbosity: int = field(default=0)
workers: int = field(default=0)
primary: bool = field(default=True)
server_info: List[ApplicationServerInfo] = field(default_factory=list)
server_info: list[ApplicationServerInfo] = field(default_factory=list)
# This property relates to the ApplicationState instance and should
# not be changed except in the __post_init__ method
@ -71,7 +71,7 @@ class ApplicationState:
if self._init and hasattr(self, f"set_{name}"):
getattr(self, f"set_{name}")(value)
def set_mode(self, value: Union[str, Mode]):
def set_mode(self, value: str | Mode):
if hasattr(self.app, "error_handler"):
self.app.error_handler.debug = self.app.debug
if getattr(self.app, "configure_logging", False) and self.app.debug:

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from sanic.compat import Header
from sanic.exceptions import BadRequest, ServerError
@ -109,9 +109,9 @@ class ASGIApp:
request: Request
transport: MockTransport
lifespan: Lifespan
ws: Optional[WebSocketConnection]
ws: WebSocketConnection | None
stage: Stage
response: Optional[BaseHTTPResponse]
response: BaseHTTPResponse | None
@classmethod
async def create(
@ -189,7 +189,7 @@ class ASGIApp:
return instance
async def read(self) -> Optional[bytes]:
async def read(self) -> bytes | None:
"""
Read and stream the body in chunks from an incoming ASGI message.
"""

View File

@ -14,15 +14,9 @@ from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)
@ -122,10 +116,10 @@ class Blueprint(BaseSanic):
def __init__(
self,
name: str,
url_prefix: Optional[str] = None,
host: Optional[Union[List[str], str]] = None,
version: Optional[Union[int, str, float]] = None,
strict_slashes: Optional[bool] = None,
url_prefix: str | None = None,
host: list[str] | str | None = None,
version: int | (str | float) | None = None,
strict_slashes: bool | None = None,
version_prefix: str = "/v",
):
super().__init__(name=name)
@ -161,7 +155,7 @@ class Blueprint(BaseSanic):
return f"Blueprint({args})"
@property
def apps(self) -> Set[Sanic]:
def apps(self) -> set[Sanic]:
"""Get the set of apps that this blueprint is registered to.
Returns:
@ -196,23 +190,23 @@ class Blueprint(BaseSanic):
def reset(self) -> None:
"""Reset the blueprint to its initial state."""
self._apps: Set[Sanic] = set()
self._apps: set[Sanic] = set()
self._allow_route_overwrite = False
self.exceptions: List[RouteHandler] = []
self.listeners: Dict[str, List[ListenerType[Any]]] = {}
self.middlewares: List[MiddlewareType] = []
self.routes: List[Route] = []
self.statics: List[RouteHandler] = []
self.websocket_routes: List[Route] = []
self.exceptions: list[RouteHandler] = []
self.listeners: dict[str, list[ListenerType[Any]]] = {}
self.middlewares: list[MiddlewareType] = []
self.routes: list[Route] = []
self.statics: list[RouteHandler] = []
self.websocket_routes: list[Route] = []
def copy(
self,
name: str,
url_prefix: Optional[Union[str, Default]] = _default,
version: Optional[Union[int, str, float, Default]] = _default,
version_prefix: Union[str, Default] = _default,
allow_route_overwrite: Union[bool, Default] = _default,
strict_slashes: Optional[Union[bool, Default]] = _default,
url_prefix: str | Default | None = _default,
version: int | (str | (float | Default)) | None = _default,
version_prefix: str | Default = _default,
allow_route_overwrite: bool | Default = _default,
strict_slashes: bool | Default | None = _default,
with_registration: bool = True,
with_ctx: bool = False,
):
@ -277,12 +271,12 @@ class Blueprint(BaseSanic):
@staticmethod
def group(
*blueprints: Union[Blueprint, BlueprintGroup],
url_prefix: Optional[str] = None,
version: Optional[Union[int, str, float]] = None,
strict_slashes: Optional[bool] = None,
*blueprints: Blueprint | BlueprintGroup,
url_prefix: str | None = None,
version: int | (str | float) | None = None,
strict_slashes: bool | None = None,
version_prefix: str = "/v",
name_prefix: Optional[str] = "",
name_prefix: str | None = "",
) -> BlueprintGroup:
"""Group multiple blueprints (or other blueprint groups) together.
@ -479,7 +473,7 @@ class Blueprint(BaseSanic):
continue
future.condition.update({"__blueprint__": self.name})
# Force exclusive to be False
app._apply_signal(tuple((*future[:-1], False)))
app._apply_signal((*future[:-1], False))
self.routes += [route for route in routes if isinstance(route, Route)]
self.websocket_routes += [
@ -516,7 +510,7 @@ class Blueprint(BaseSanic):
*[app.dispatch(*args, **kwargs) for app in self.apps]
)
def event(self, event: str, timeout: Optional[Union[int, float]] = None):
def event(self, event: str, timeout: int | float | None = None):
"""Wait for a signal event to be dispatched.
Args:
@ -550,7 +544,7 @@ class Blueprint(BaseSanic):
return value
@staticmethod
def _setup_uri(base: str, prefix: Optional[str]):
def _setup_uri(base: str, prefix: str | None):
uri = base
if prefix:
uri = prefix
@ -563,7 +557,7 @@ class Blueprint(BaseSanic):
@staticmethod
def register_futures(
apps: Set[Sanic], bp: Blueprint, futures: Sequence[Tuple[Any, ...]]
apps: set[Sanic], bp: Blueprint, futures: Sequence[tuple[Any, ...]]
):
"""Register futures to the apps.
@ -575,7 +569,7 @@ class Blueprint(BaseSanic):
"""
for app in apps:
app._future_registry.update(set((bp, item) for item in futures))
app._future_registry.update({(bp, item) for item in futures})
if sys.version_info < (3, 9):
@ -667,13 +661,13 @@ class BlueprintGroup(bpg_base):
def __init__(
self,
url_prefix: Optional[str] = None,
version: Optional[Union[int, str, float]] = None,
strict_slashes: Optional[bool] = None,
url_prefix: str | None = None,
version: int | (str | float) | None = None,
strict_slashes: bool | None = None,
version_prefix: str = "/v",
name_prefix: Optional[str] = "",
name_prefix: str | None = "",
):
self._blueprints: List[Blueprint] = []
self._blueprints: list[Blueprint] = []
self._url_prefix = url_prefix
self._version = version
self._version_prefix = version_prefix
@ -681,7 +675,7 @@ class BlueprintGroup(bpg_base):
self._name_prefix = name_prefix
@property
def url_prefix(self) -> Optional[Union[int, str, float]]:
def url_prefix(self) -> int | (str | float) | None:
"""The URL prefix for the Blueprint Group.
Returns:
@ -691,7 +685,7 @@ class BlueprintGroup(bpg_base):
return self._url_prefix
@property
def blueprints(self) -> List[Blueprint]:
def blueprints(self) -> list[Blueprint]:
"""A list of all the available blueprints under this group.
Returns:
@ -701,7 +695,7 @@ class BlueprintGroup(bpg_base):
return self._blueprints
@property
def version(self) -> Optional[Union[str, int, float]]:
def version(self) -> str | (int | float) | None:
"""API Version for the Blueprint Group, if any.
Returns:
@ -710,7 +704,7 @@ class BlueprintGroup(bpg_base):
return self._version
@property
def strict_slashes(self) -> Optional[bool]:
def strict_slashes(self) -> bool | None:
"""Whether to enforce strict slashes for the Blueprint Group.
Returns:
@ -728,7 +722,7 @@ class BlueprintGroup(bpg_base):
return self._version_prefix
@property
def name_prefix(self) -> Optional[str]:
def name_prefix(self) -> str | None:
"""Name prefix for the Blueprint Group.
This is mainly needed when blueprints are copied in order to
@ -756,8 +750,8 @@ class BlueprintGroup(bpg_base):
...
def __getitem__(
self, item: Union[int, slice]
) -> Union[Blueprint, MutableSequence[Blueprint]]:
self, item: int | slice
) -> Blueprint | MutableSequence[Blueprint]:
"""Get the Blueprint object at the specified index.
This method returns a blueprint inside the group specified by
@ -785,8 +779,8 @@ class BlueprintGroup(bpg_base):
def __setitem__(
self,
index: Union[int, slice],
item: Union[Blueprint, Iterable[Blueprint]],
index: int | slice,
item: Blueprint | Iterable[Blueprint],
) -> None:
"""Set the Blueprint object at the specified index.
@ -824,7 +818,7 @@ class BlueprintGroup(bpg_base):
def __delitem__(self, index: slice) -> None:
...
def __delitem__(self, index: Union[int, slice]) -> None:
def __delitem__(self, index: int | slice) -> None:
"""Delete the Blueprint object at the specified index.
Abstract method implemented to turn the `BlueprintGroup` class

View File

@ -196,7 +196,7 @@ Or, a path to a directory to run as a simple HTTP server:
if self.args.tlshost:
ssl.append(None)
if self.args.cert is not None or self.args.key is not None:
ssl.append(dict(cert=self.args.cert, key=self.args.key))
ssl.append({"cert": self.args.cert, "key": self.args.key})
if self.args.tls:
ssl += self.args.tls
if not ssl:

View File

@ -1,7 +1,6 @@
from __future__ import annotations
from argparse import ArgumentParser, _ArgumentGroup
from typing import List, Optional, Type, Union
from sanic_routing import __version__ as __routing_version__
@ -10,14 +9,14 @@ from sanic.http.constants import HTTP
class Group:
name: Optional[str]
container: Union[ArgumentParser, _ArgumentGroup]
_registry: List[Type[Group]] = []
name: str | None
container: ArgumentParser | _ArgumentGroup
_registry: list[type[Group]] = []
def __init_subclass__(cls) -> None:
Group._registry.append(cls)
def __init__(self, parser: ArgumentParser, title: Optional[str]):
def __init__(self, parser: ArgumentParser, title: str | None):
self.parser = parser
if title:

View File

@ -4,7 +4,7 @@ import sys
from http.client import RemoteDisconnected
from textwrap import indent
from typing import Any, Dict, Optional
from typing import Any
from urllib.error import URLError
from urllib.request import Request as URequest
from urllib.request import urlopen
@ -27,7 +27,7 @@ class InspectorClient:
port: int,
secure: bool,
raw: bool,
api_key: Optional[str],
api_key: str | None,
) -> None:
self.scheme = "https" if secure else "http"
self.host = host
@ -89,7 +89,7 @@ class InspectorClient:
def request(self, action: str, method: str = "POST", **kwargs: Any) -> Any:
url = f"{self.base_url}/{action}"
params: Dict[str, Any] = {"method": method, "headers": {}}
params: dict[str, Any] = {"method": method, "headers": {}}
if kwargs:
params["data"] = dumps(kwargs).encode()
params["headers"]["content-type"] = "application/json"

View File

@ -6,7 +6,7 @@ import sys
from contextlib import contextmanager
from enum import Enum
from typing import Awaitable, Union
from typing import Awaitable, Literal, Union
from multidict import CIMultiDict # type: ignore
@ -14,14 +14,9 @@ from sanic.helpers import Default
from sanic.log import error_logger
if sys.version_info < (3, 8): # no cov
StartMethod = Union[Default, str]
else: # no cov
from typing import Literal
StartMethod = Union[
StartMethod = Union[
Default, Literal["fork"], Literal["forkserver"], Literal["spawn"]
]
]
OS_IS_WINDOWS = os.name == "nt"
PYPY_IMPLEMENTATION = platform.python_implementation() == "PyPy"
@ -142,7 +137,7 @@ if use_trio: # pragma: no cover
return trio.Path(path).stat()
open_async = trio.open_file
CancelledErrors = tuple([asyncio.CancelledError, trio.Cancelled])
CancelledErrors = (asyncio.CancelledError, trio.Cancelled)
else:
if PYPY_IMPLEMENTATION:
pypy_os_module_patch()
@ -156,7 +151,7 @@ else:
async def open_async(file, mode="r", **kwargs):
return aio_open(file, mode, **kwargs)
CancelledErrors = tuple([asyncio.CancelledError])
CancelledErrors = (asyncio.CancelledError,)
def ctrlc_workaround_for_windows(app):

View File

@ -1,12 +1,10 @@
from __future__ import annotations
import sys
from abc import ABCMeta
from inspect import getmembers, isclass, isdatadescriptor
from os import environ
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Sequence, Union
from typing import Any, Callable, Literal, Sequence, Union
from warnings import filterwarnings
from sanic.constants import LocalCertCreator
@ -17,19 +15,14 @@ from sanic.log import error_logger
from sanic.utils import load_module_from_file_location, str_to_bool
if sys.version_info >= (3, 8):
from typing import Literal
FilterWarningType = Union[
FilterWarningType = Union[
Literal["default"],
Literal["error"],
Literal["ignore"],
Literal["always"],
Literal["module"],
Literal["once"],
]
else:
FilterWarningType = str
]
SANIC_PREFIX = "SANIC_"
@ -100,25 +93,25 @@ class Config(dict, metaclass=DescriptorMeta):
EVENT_AUTOREGISTER: bool
DEPRECATION_FILTER: FilterWarningType
FORWARDED_FOR_HEADER: str
FORWARDED_SECRET: Optional[str]
FORWARDED_SECRET: str | None
GRACEFUL_SHUTDOWN_TIMEOUT: float
INSPECTOR: bool
INSPECTOR_HOST: str
INSPECTOR_PORT: int
INSPECTOR_TLS_KEY: Union[Path, str, Default]
INSPECTOR_TLS_CERT: Union[Path, str, Default]
INSPECTOR_TLS_KEY: Path | (str | Default)
INSPECTOR_TLS_CERT: Path | (str | Default)
INSPECTOR_API_KEY: str
KEEP_ALIVE_TIMEOUT: int
KEEP_ALIVE: bool
LOCAL_CERT_CREATOR: Union[str, LocalCertCreator]
LOCAL_TLS_KEY: Union[Path, str, Default]
LOCAL_TLS_CERT: Union[Path, str, Default]
LOCAL_CERT_CREATOR: str | LocalCertCreator
LOCAL_TLS_KEY: Path | (str | Default)
LOCAL_TLS_CERT: Path | (str | Default)
LOCALHOST: str
MOTD: bool
MOTD_DISPLAY: Dict[str, str]
MOTD_DISPLAY: dict[str, str]
NOISY_EXCEPTIONS: bool
PROXIES_COUNT: Optional[int]
REAL_IP_HEADER: Optional[str]
PROXIES_COUNT: int | None
REAL_IP_HEADER: str | None
REQUEST_BUFFER_SIZE: int
REQUEST_MAX_HEADER_SIZE: int
REQUEST_ID_HEADER: str
@ -127,21 +120,19 @@ class Config(dict, metaclass=DescriptorMeta):
RESPONSE_TIMEOUT: int
SERVER_NAME: str
TLS_CERT_PASSWORD: str
TOUCHUP: Union[Default, bool]
USE_UVLOOP: Union[Default, bool]
TOUCHUP: Default | bool
USE_UVLOOP: Default | bool
WEBSOCKET_MAX_SIZE: int
WEBSOCKET_PING_INTERVAL: int
WEBSOCKET_PING_TIMEOUT: int
def __init__(
self,
defaults: Optional[
Dict[str, Union[str, bool, int, float, None]]
] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
keep_alive: Optional[bool] = None,
defaults: dict[str, str | (bool | (int | (float | None)))] | None = None,
env_prefix: str | None = SANIC_PREFIX,
keep_alive: bool | None = None,
*,
converters: Optional[Sequence[Callable[[str], Any]]] = None,
converters: Sequence[Callable[[str], Any]] | None = None,
):
defaults = defaults or {}
super().__init__({**DEFAULT_CONFIG, **defaults})
@ -209,7 +200,7 @@ class Config(dict, metaclass=DescriptorMeta):
```
"""
kwargs.update({k: v for item in other for k, v in dict(item).items()})
setters: Dict[str, Any] = {
setters: dict[str, Any] = {
k: kwargs.pop(k)
for k in {**kwargs}.keys()
if k in self.__class__.__setters__
@ -276,7 +267,7 @@ class Config(dict, metaclass=DescriptorMeta):
module=r"sanic.*",
)
def _check_error_format(self, format: Optional[str] = None):
def _check_error_format(self, format: str | None = None):
check_error_format(format or self.FALLBACK_ERROR_FORMAT)
def load_environment_vars(self, prefix=SANIC_PREFIX):
@ -332,7 +323,7 @@ class Config(dict, metaclass=DescriptorMeta):
except ValueError:
pass
def update_config(self, config: Union[bytes, str, dict, Any]):
def update_config(self, config: bytes | (str | (dict | Any))):
"""Update app.config.
.. note::

View File

@ -2,10 +2,9 @@ from __future__ import annotations
import re
import string
import sys
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Union
from sanic.exceptions import ServerError
from sanic.log import deprecation
@ -14,19 +13,17 @@ from sanic.log import deprecation
if TYPE_CHECKING:
from sanic.compat import Header
if sys.version_info < (3, 8): # no cov
SameSite = str
else: # no cov
from typing import Literal
from typing import Literal
SameSite = Union[
SameSite = Union[
Literal["Strict"],
Literal["Lax"],
Literal["None"],
Literal["strict"],
Literal["lax"],
Literal["none"],
]
]
DEFAULT_MAX_AGE = 0
SAMESITE_VALUES = ("strict", "lax", "none")
@ -180,7 +177,7 @@ class CookieJar(dict):
return CookieJar.HEADER_KEY
@property
def cookie_headers(self) -> Dict[str, str]: # no cov
def cookie_headers(self) -> dict[str, str]: # no cov
"""Deprecated in v24.3"""
deprecation(
"The CookieJar.coookie_headers property has been deprecated "
@ -191,7 +188,7 @@ class CookieJar(dict):
return {key: self.header_key for key in self}
@property
def cookies(self) -> List[Cookie]:
def cookies(self) -> list[Cookie]:
"""A list of cookies in the CookieJar.
Returns:
@ -203,10 +200,10 @@ class CookieJar(dict):
self,
key: str,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Optional[Cookie]:
) -> Cookie | None:
"""Fetch a cookie from the CookieJar.
Args:
@ -236,7 +233,7 @@ class CookieJar(dict):
self,
key: str,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> bool:
@ -271,14 +268,14 @@ class CookieJar(dict):
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
max_age: int | None = None,
expires: datetime | None = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
samesite: SameSite | None = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
comment: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Cookie:
@ -362,7 +359,7 @@ class CookieJar(dict):
key: str,
*,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> None:
@ -390,7 +387,7 @@ class CookieJar(dict):
:type secure_prefix: bool
"""
# remove it from header
cookies: List[Cookie] = self.headers.popall(self.HEADER_KEY, [])
cookies: list[Cookie] = self.headers.popall(self.HEADER_KEY, [])
for cookie in cookies:
if (
cookie.key != Cookie.make_key(key, host_prefix, secure_prefix)
@ -481,14 +478,14 @@ class Cookie(dict):
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
max_age: int | None = None,
expires: datetime | None = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
samesite: SameSite | None = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
comment: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
):
@ -561,7 +558,7 @@ class Cookie(dict):
# in v24.3 when this is no longer a dict
def _set_value(self, key: str, value: Any) -> None:
if key not in self._keys:
raise KeyError("Unknown cookie property: %s=%s" % (key, value))
raise KeyError(f"Unknown cookie property: {key}={value}")
if value is not None:
if key.lower() == "max-age" and not str(value).isdigit():
@ -604,7 +601,7 @@ class Cookie(dict):
def __str__(self):
"""Format as a Set-Cookie header value."""
output = ["%s=%s" % (self.key, _quote(self.value))]
output = [f"{self.key}={_quote(self.value)}"]
key_index = list(self._keys)
for key, value in sorted(
self.items(), key=lambda x: key_index.index(x[0])
@ -614,11 +611,10 @@ class Cookie(dict):
try:
output.append("%s=%d" % (self._keys[key], value))
except TypeError:
output.append("%s=%s" % (self._keys[key], value))
output.append(f"{self._keys[key]}={value}")
elif key == "expires":
output.append(
"%s=%s"
% (
"{}={}".format(
self._keys[key],
value.strftime("%a, %d-%b-%Y %T GMT"),
)
@ -626,7 +622,7 @@ class Cookie(dict):
elif key in self._flags:
output.append(self._keys[key])
else:
output.append("%s=%s" % (self._keys[key], value))
output.append(f"{self._keys[key]}={value}")
return "; ".join(output)
@ -640,7 +636,7 @@ class Cookie(dict):
self._set_value("path", value)
@property
def expires(self) -> Optional[datetime]: # no cov
def expires(self) -> datetime | None: # no cov
"""The expiration date of the cookie. Defaults to `None`."""
return self.get("expires")
@ -649,7 +645,7 @@ class Cookie(dict):
self._set_value("expires", value)
@property
def comment(self) -> Optional[str]: # no cov
def comment(self) -> str | None: # no cov
"""A comment for the cookie. Defaults to `None`."""
return self.get("comment")
@ -658,7 +654,7 @@ class Cookie(dict):
self._set_value("comment", value)
@property
def domain(self) -> Optional[str]: # no cov
def domain(self) -> str | None: # no cov
"""The domain of the cookie. Defaults to `None`."""
return self.get("domain")
@ -667,7 +663,7 @@ class Cookie(dict):
self._set_value("domain", value)
@property
def max_age(self) -> Optional[int]: # no cov
def max_age(self) -> int | None: # no cov
"""The maximum age of the cookie in seconds. Defaults to `None`."""
return self.get("max-age")
@ -694,7 +690,7 @@ class Cookie(dict):
self._set_value("httponly", value)
@property
def samesite(self) -> Optional[SameSite]: # no cov
def samesite(self) -> SameSite | None: # no cov
"""The SameSite attribute for the cookie. Defaults to `"Lax"`."""
return self.get("samesite")

View File

@ -73,7 +73,7 @@ class BaseRenderer:
self.debug = debug
@property
def headers(self) -> t.Dict[str, str]:
def headers(self) -> dict[str, str]:
"""The headers to be used for the response."""
if isinstance(self.exception, SanicException):
return getattr(self.exception, "headers", {})
@ -326,8 +326,8 @@ def exception_response(
exception: Exception,
debug: bool,
fallback: str,
base: t.Type[BaseRenderer],
renderer: t.Optional[t.Type[BaseRenderer]] = None,
base: type[BaseRenderer],
renderer: type[BaseRenderer] | None = None,
) -> HTTPResponse:
"""Render a response for the default FALLBACK exception handler."""
if not renderer:

View File

@ -622,7 +622,7 @@ class Unauthorized(HTTPException):
# if auth-scheme is specified, set "WWW-Authenticate" header
if scheme is not None:
values = [
'{!s}="{!s}"'.format(k, v) for k, v in challenges.items()
f'{k!s}="{v!s}"' for k, v in challenges.items()
]
challenge = ", ".join(values)

View File

@ -34,20 +34,20 @@ class ContentRangeHandler(Range):
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
f"{unit} is not a valid Range Type", self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
f"'{start_b}' is invalid for Content Range", self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
f"'{end_b}' is invalid for Content Range", self
)
if self.end is None:
if self.start is None:
@ -68,8 +68,7 @@ class ContentRangeHandler(Range):
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
"Content-Range": f"bytes {self.start}-{self.end}/{self.total}"
}
def __bool__(self):

View File

@ -4,7 +4,7 @@ from datetime import datetime
from operator import itemgetter
from pathlib import Path
from stat import S_ISDIR
from typing import Dict, Iterable, Optional, Sequence, Union, cast
from typing import Iterable, Sequence, cast
from sanic.exceptions import NotFound
from sanic.pages.directory_page import DirectoryPage, FileInfo
@ -28,7 +28,7 @@ class DirectoryHandler:
uri: str,
directory: Path,
directory_view: bool = False,
index: Optional[Union[str, Sequence[str]]] = None,
index: str | Sequence[str] | None = None,
) -> None:
if isinstance(index, str):
index = [index]
@ -80,7 +80,7 @@ class DirectoryHandler:
page = DirectoryPage(self._iter_files(location), path, debug)
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
def _prepare_file(self, path: Path) -> dict[str, int | str]:
stat = path.stat()
modified = (
datetime.fromtimestamp(stat.st_mtime)

View File

@ -1,7 +1,5 @@
from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Type
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
from sanic.exceptions import ServerError
from sanic.log import error_logger
@ -25,20 +23,20 @@ class ErrorHandler:
def __init__(
self,
base: Type[BaseRenderer] = TextRenderer,
base: type[BaseRenderer] = TextRenderer,
):
self.cached_handlers: Dict[
Tuple[Type[BaseException], Optional[str]], Optional[RouteHandler]
self.cached_handlers: dict[
tuple[type[BaseException], str | None], RouteHandler | None
] = {}
self.debug = False
self.base = base
def _full_lookup(self, exception, route_name: Optional[str] = None):
def _full_lookup(self, exception, route_name: str | None = None):
return self.lookup(exception, route_name)
def _add(
self,
key: Tuple[Type[BaseException], Optional[str]],
key: tuple[type[BaseException], str | None],
handler: RouteHandler,
) -> None:
if key in self.cached_handlers:
@ -53,7 +51,7 @@ class ErrorHandler:
raise ServerError(message)
self.cached_handlers[key] = handler
def add(self, exception, handler, route_names: Optional[List[str]] = None):
def add(self, exception, handler, route_names: list[str] | None = None):
"""Add a new exception handler to an already existing handler object.
Args:
@ -72,7 +70,7 @@ class ErrorHandler:
else:
self._add((exception, None), handler)
def lookup(self, exception, route_name: Optional[str] = None):
def lookup(self, exception, route_name: str | None = None):
"""Lookup the existing instance of `ErrorHandler` and fetch the registered handler for a specific type of exception.
This method leverages a dict lookup to speedup the retrieval process.

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import re
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from typing import Any, Dict, Iterable, Tuple, Union
from urllib.parse import unquote
from sanic.exceptions import InvalidHeader
@ -85,8 +85,8 @@ class MediaType:
def match(
self,
mime_with_params: Union[str, MediaType],
) -> Optional[MediaType]:
mime_with_params: str | MediaType,
) -> MediaType | None:
"""Match this media type against another media type.
Check if this media type matches the given mime type/subtype.
@ -141,7 +141,7 @@ class MediaType:
return any(part == "*" for part in (self.subtype, self.type))
@classmethod
def _parse(cls, mime_with_params: str) -> Optional[MediaType]:
def _parse(cls, mime_with_params: str) -> MediaType | None:
mtype = mime_with_params.strip()
if "/" not in mime_with_params:
return None
@ -151,12 +151,10 @@ class MediaType:
if not type_ or not subtype:
raise ValueError(f"Invalid media type: {mtype}")
params = dict(
[
(key.strip(), value.strip())
params = {
key.strip(): value.strip()
for key, value in (param.split("=", 1) for param in raw_params)
]
)
}
return cls(type_.lstrip(), subtype.rstrip(), **params)
@ -173,7 +171,7 @@ class Matched:
header (MediaType): The header to match against, if any.
"""
def __init__(self, mime: str, header: Optional[MediaType]):
def __init__(self, mime: str, header: MediaType | None):
self.mime = mime
self.header = header
@ -200,7 +198,7 @@ class Matched:
)
)
def _compare(self, other) -> Tuple[bool, Matched]:
def _compare(self, other) -> tuple[bool, Matched]:
if isinstance(other, str):
parsed = Matched.parse(other)
if self.mime == other:
@ -215,7 +213,7 @@ class Matched:
f"mime types of '{self.mime}' and '{other}'"
)
def match(self, other: Union[str, Matched]) -> Optional[Matched]:
def match(self, other: str | Matched) -> Matched | None:
"""Match this MIME string against another MIME string.
Check if this MIME string matches the given MIME string. Wildcards are supported both ways on both type and subtype.
@ -296,7 +294,7 @@ class AcceptList(list):
return ", ".join(str(m) for m in self)
def parse_accept(accept: Optional[str]) -> AcceptList:
def parse_accept(accept: str | None) -> AcceptList:
"""Parse an Accept header and order the acceptable media types according to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
@ -327,7 +325,7 @@ def parse_accept(accept: Optional[str]) -> AcceptList:
raise InvalidHeader(f"Invalid header value in Accept: {accept}")
def parse_content_header(value: str) -> Tuple[str, Options]:
def parse_content_header(value: str) -> tuple[str, Options]:
"""Parse content-type and content-disposition header values.
E.g. `form-data; name=upload; filename="file.txt"` to
@ -346,7 +344,7 @@ def parse_content_header(value: str) -> Tuple[str, Options]:
"""
pos = value.find(";")
if pos == -1:
options: Dict[str, Union[int, str]] = {}
options: dict[str, int | str] = {}
else:
options = {
m.group(1).lower(): (m.group(2) or m.group(3))
@ -366,7 +364,7 @@ def parse_content_header(value: str) -> Tuple[str, Options]:
_rparam = re.compile(f"(?:{_token}|{_quoted})={_token}\\s*($|[;,])", re.ASCII)
def parse_forwarded(headers, config) -> Optional[Options]:
def parse_forwarded(headers, config) -> Options | None:
"""Parse RFC 7239 Forwarded headers.
The value of `by` or `secret` must match `config.FORWARDED_SECRET`
:return: dict with keys and values, or None if nothing matched
@ -380,7 +378,7 @@ def parse_forwarded(headers, config) -> Optional[Options]:
return None
# Loop over <separator><key>=<value> elements from right to left
sep = pos = None
options: List[Tuple[str, str]] = []
options: list[tuple[str, str]] = []
found = False
for m in _rparam.finditer(header[::-1]):
# Start of new element? (on parser skips and non-semicolon right sep)
@ -404,7 +402,7 @@ def parse_forwarded(headers, config) -> Optional[Options]:
return fwd_normalize(reversed(options)) if found else None
def parse_xforwarded(headers, config) -> Optional[Options]:
def parse_xforwarded(headers, config) -> Options | None:
"""Parse traditional proxy headers."""
real_ip_header = config.REAL_IP_HEADER
proxies_count = config.PROXIES_COUNT
@ -451,7 +449,7 @@ def fwd_normalize(fwd: OptionsIterable) -> Options:
Returns:
Options: A dict of normalized key-value pairs.
"""
ret: Dict[str, Union[int, str]] = {}
ret: dict[str, int | str] = {}
for key, val in fwd:
if val is not None:
try:
@ -488,7 +486,7 @@ def fwd_normalize_address(addr: str) -> str:
return addr.lower()
def parse_host(host: str) -> Tuple[Optional[str], Optional[int]]:
def parse_host(host: str) -> tuple[str | None, int | None]:
"""Split host:port into hostname and port.
Args:
@ -530,9 +528,9 @@ def format_http1_response(status: int, headers: HeaderBytesIterable) -> bytes:
def parse_credentials(
header: Optional[str],
prefixes: Optional[Union[List, Tuple, Set]] = None,
) -> Tuple[Optional[str], Optional[str]]:
header: str | None,
prefixes: list | (tuple | set) | None = None,
) -> tuple[str | None, str | None]:
"""Parses any header with the aim to retrieve any credentials from it.
Args:

View File

@ -132,7 +132,7 @@ def remove_entity_headers(headers, allowed=("content-location", "expires")):
returns the headers without the entity headers
"""
allowed = set([h.lower() for h in allowed])
allowed = {h.lower() for h in allowed}
headers = {
header: value
for header, value in headers.items()

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@ -481,7 +481,7 @@ class Http(Stream, metaclass=TouchUpMeta):
if data:
yield data
async def read(self) -> Optional[bytes]: # no cov
async def read(self) -> bytes | None: # no cov
"""Read some bytes of request body."""
# Send a 100-continue if needed

View File

@ -8,10 +8,6 @@ from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Union,
cast,
)
@ -109,11 +105,11 @@ class HTTPReceiver(Receiver, Stream):
self.request_body = None
self.stage = Stage.IDLE
self.headers_sent = False
self.response: Optional[BaseHTTPResponse] = None
self.response: BaseHTTPResponse | None = None
self.request_max_size = self.protocol.request_max_size
self.request_bytes = 0
async def run(self, exception: Optional[Exception] = None):
async def run(self, exception: Exception | None = None):
"""Handle the request and response cycle."""
self.stage = Stage.HANDLER
self.head_only = self.request.method.upper() == "HEAD"
@ -148,7 +144,7 @@ class HTTPReceiver(Receiver, Stream):
def _prepare_headers(
self, response: BaseHTTPResponse
) -> List[Tuple[bytes, bytes]]:
) -> list[tuple[bytes, bytes]]:
size = len(response.body) if response.body else 0
headers = response.headers
status = response.status
@ -304,7 +300,7 @@ class Http3:
) -> None:
self.protocol = protocol
self.transmit = transmit
self.receivers: Dict[int, Receiver] = {}
self.receivers: dict[int, Receiver] = {}
def http_event_received(self, event: H3Event) -> None:
logger.debug( # no cov
@ -330,7 +326,7 @@ class Http3:
extra={"verbosity": 2},
)
def get_or_make_receiver(self, event: H3Event) -> Tuple[Receiver, bool]:
def get_or_make_receiver(self, event: H3Event) -> tuple[Receiver, bool]:
if (
isinstance(event, HeadersReceived)
and event.stream_id not in self.receivers
@ -396,17 +392,17 @@ class SessionTicketStore:
"""
def __init__(self) -> None:
self.tickets: Dict[bytes, SessionTicket] = {}
self.tickets: dict[bytes, SessionTicket] = {}
def add(self, ticket: SessionTicket) -> None:
self.tickets[ticket.ticket] = ticket
def pop(self, label: bytes) -> Optional[SessionTicket]:
def pop(self, label: bytes) -> SessionTicket | None:
return self.tickets.pop(label, None)
def get_config(
app: Sanic, ssl: Union[SanicSSLContext, CertSelector, SSLContext]
app: Sanic, ssl: SanicSSLContext | (CertSelector | SSLContext)
):
# TODO:
# - proper selection needed if service with multiple certs insted of

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Tuple, Union
from typing import TYPE_CHECKING
from sanic.http.constants import Stage
@ -12,13 +12,13 @@ if TYPE_CHECKING:
class Stream:
stage: Stage
response: Optional[BaseHTTPResponse]
response: BaseHTTPResponse | None
protocol: HttpProtocol
url: Optional[str]
request_body: Optional[bytes]
request_max_size: Union[int, float]
url: str | None
request_body: bytes | None
request_max_size: int | float
__touchup__: Tuple[str, ...] = tuple()
__touchup__: tuple[str, ...] = ()
__slots__ = ("request_max_size",)
def respond(

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import os
import ssl
from typing import Any, Dict, Iterable, Optional, Union
from typing import Any, Iterable
from sanic.log import logger
@ -21,9 +21,9 @@ CIPHERS_TLS12 = [
def create_context(
certfile: Optional[str] = None,
keyfile: Optional[str] = None,
password: Optional[str] = None,
certfile: str | None = None,
keyfile: str | None = None,
password: str | None = None,
purpose: ssl.Purpose = ssl.Purpose.CLIENT_AUTH,
) -> ssl.SSLContext:
"""Create a context with secure crypto and HTTP/1.1 in protocols."""
@ -39,8 +39,8 @@ def create_context(
def shorthand_to_ctx(
ctxdef: Union[None, ssl.SSLContext, dict, str]
) -> Optional[ssl.SSLContext]:
ctxdef: None | (ssl.SSLContext | (dict | str))
) -> ssl.SSLContext | None:
"""Convert an ssl argument shorthand to an SSLContext object."""
if ctxdef is None or isinstance(ctxdef, ssl.SSLContext):
return ctxdef
@ -55,8 +55,8 @@ def shorthand_to_ctx(
def process_to_context(
ssldef: Union[None, ssl.SSLContext, dict, str, list, tuple]
) -> Optional[ssl.SSLContext]:
ssldef: None | (ssl.SSLContext | (dict | (str | (list | tuple))))
) -> ssl.SSLContext | None:
"""Process app.run ssl argument from easy formats to full SSLContext."""
return (
CertSelector(map(shorthand_to_ctx, ssldef))
@ -101,7 +101,7 @@ def find_cert(self: CertSelector, server_name: str):
def match_hostname(
ctx: Union[ssl.SSLContext, CertSelector], hostname: str
ctx: ssl.SSLContext | CertSelector, hostname: str
) -> bool:
"""Match names from CertSelector against a received hostname."""
# Local certs are considered trusted, so this can be less pedantic
@ -119,7 +119,7 @@ def match_hostname(
def selector_sni_callback(
sslobj: ssl.SSLObject, server_name: str, ctx: CertSelector
) -> Optional[int]:
) -> int | None:
"""Select a certificate matching the SNI."""
# Call server_name_callback to store the SNI on sslobj
server_name_callback(sslobj, server_name, ctx)
@ -142,7 +142,7 @@ def server_name_callback(
class SanicSSLContext(ssl.SSLContext):
sanic: Dict[str, os.PathLike]
sanic: dict[str, os.PathLike]
@classmethod
def create_from_ssl_context(cls, context: ssl.SSLContext):
@ -153,7 +153,7 @@ class SanicSSLContext(ssl.SSLContext):
class CertSimple(SanicSSLContext):
"""A wrapper for creating SSLContext with a sanic attribute."""
sanic: Dict[str, Any]
sanic: dict[str, Any]
def __new__(cls, cert, key, **kw):
# try common aliases, rename to cert/key
@ -190,7 +190,7 @@ class CertSelector(ssl.SSLContext):
def __new__(cls, ctxs):
return super().__new__(cls)
def __init__(self, ctxs: Iterable[Optional[ssl.SSLContext]]):
def __init__(self, ctxs: Iterable[ssl.SSLContext | None]):
super().__init__()
self.sni_callback = selector_sni_callback # type: ignore
self.sanic_select = []

View File

@ -9,7 +9,7 @@ from contextlib import suppress
from pathlib import Path
from tempfile import mkdtemp
from types import ModuleType
from typing import TYPE_CHECKING, Optional, Tuple, Type, Union, cast
from typing import TYPE_CHECKING, cast
from sanic.application.constants import Mode
from sanic.application.spinner import loading
@ -47,7 +47,7 @@ CIPHERS_TLS12 = [
]
def _make_path(maybe_path: Union[Path, str], tmpdir: Optional[Path]) -> Path:
def _make_path(maybe_path: Path | str, tmpdir: Path | None) -> Path:
if isinstance(maybe_path, Path):
return maybe_path
else:
@ -61,7 +61,7 @@ def _make_path(maybe_path: Union[Path, str], tmpdir: Optional[Path]) -> Path:
def get_ssl_context(
app: Sanic, ssl: Optional[ssl.SSLContext]
app: Sanic, ssl: ssl.SSLContext | None
) -> ssl.SSLContext:
if ssl:
return ssl
@ -126,10 +126,10 @@ class CertCreator(ABC):
local_tls_key,
local_tls_cert,
) -> CertCreator:
creator: Optional[CertCreator] = None
creator: CertCreator | None = None
cert_creator_options: Tuple[
Tuple[Type[CertCreator], LocalCertCreator], ...
cert_creator_options: tuple[
tuple[type[CertCreator], LocalCertCreator], ...
] = (
(MkcertCreator, LocalCertCreator.MKCERT),
(TrustmeCreator, LocalCertCreator.TRUSTME),
@ -160,8 +160,8 @@ class CertCreator(ABC):
@staticmethod
def _try_select(
app: Sanic,
creator: Optional[CertCreator],
creator_class: Type[CertCreator],
creator: CertCreator | None,
creator_class: type[CertCreator],
creator_requirement: LocalCertCreator,
creator_requested: LocalCertCreator,
local_tls_key,

View File

@ -19,10 +19,10 @@ else:
from enum import StrEnum
LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
version=1,
disable_existing_loggers=False,
loggers={
LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = { # no cov
"version": 1,
"disable_existing_loggers": False,
"loggers": {
"sanic.root": {"level": "INFO", "handlers": ["console"]},
"sanic.error": {
"level": "INFO",
@ -43,7 +43,7 @@ LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
"qualname": "sanic.server",
},
},
handlers={
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "generic",
@ -60,7 +60,7 @@ LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
"stream": sys.stdout,
},
},
formatters={
"formatters": {
"generic": {
"format": "%(asctime)s [%(process)s] [%(levelname)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
@ -73,7 +73,7 @@ LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
"class": "logging.Formatter",
},
},
)
}
"""
Defult logging configuration
"""

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from collections import deque
from enum import IntEnum, auto
from itertools import count
from typing import Deque, Sequence, Union
from typing import Sequence
from sanic.models.handler_types import MiddlewareType
@ -69,9 +69,9 @@ class Middleware:
@classmethod
def convert(
cls,
*middleware_collections: Sequence[Union[Middleware, MiddlewareType]],
*middleware_collections: Sequence[Middleware | MiddlewareType],
location: MiddlewareLocation,
) -> Deque[Middleware]:
) -> deque[Middleware]:
"""Convert middleware collections to a deque of Middleware objects.
Args:

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from enum import Enum
from typing import Any, Callable, Coroutine, Dict, Optional, Set, Union
from typing import Any, Callable, Coroutine
from sanic.base.meta import SanicMeta
from sanic.models.futures import FutureSignal
@ -12,17 +12,17 @@ from sanic.types import HashableDict
class SignalMixin(metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_signals: Set[FutureSignal] = set()
self._future_signals: set[FutureSignal] = set()
def _apply_signal(self, signal: FutureSignal) -> Signal:
raise NotImplementedError # noqa
def signal(
self,
event: Union[str, Enum],
event: str | Enum,
*,
apply: bool = True,
condition: Optional[Dict[str, Any]] = None,
condition: dict[str, Any] | None = None,
exclusive: bool = True,
) -> Callable[[SignalHandler], SignalHandler]:
"""
@ -64,9 +64,9 @@ class SignalMixin(metaclass=SanicMeta):
def add_signal(
self,
handler: Optional[Callable[..., Any]],
handler: Callable[..., Any] | None,
event: str,
condition: Optional[Dict[str, Any]] = None,
condition: dict[str, Any] | None = None,
exclusive: bool = True,
) -> Callable[..., Any]:
"""Registers a signal handler for a specific event.

View File

@ -2,7 +2,6 @@ from __future__ import annotations
import os
import platform
import sys
from asyncio import (
AbstractEventLoop,
@ -32,13 +31,7 @@ from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Mapping,
Optional,
Set,
Tuple,
Type,
Union,
cast,
)
@ -79,20 +72,18 @@ if TYPE_CHECKING:
SANIC_PACKAGES = ("sanic-routing", "sanic-testing", "sanic-ext")
if sys.version_info < (3, 8): # no cov
HTTPVersion = Union[HTTP, int]
else: # no cov
from typing import Literal
from typing import Literal
HTTPVersion = Union[HTTP, Literal[1], Literal[3]]
HTTPVersion = Union[HTTP, Literal[1], Literal[3]]
class StartupMixin(metaclass=SanicMeta):
_app_registry: ClassVar[Dict[str, Sanic]]
_app_registry: ClassVar[dict[str, Sanic]]
asgi: bool
config: Config
listeners: Dict[str, List[ListenerType[Any]]]
listeners: dict[str, list[ListenerType[Any]]]
state: ApplicationState
websocket_enabled: bool
multiplexer: WorkerMultiplexer
@ -159,28 +150,28 @@ class StartupMixin(metaclass=SanicMeta):
def run(
self,
host: Optional[str] = None,
port: Optional[int] = None,
host: str | None = None,
port: int | None = None,
*,
dev: bool = False,
debug: bool = False,
auto_reload: Optional[bool] = None,
auto_reload: bool | None = None,
version: HTTPVersion = HTTP.VERSION_1,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
ssl: None | (SSLContext | (dict | (str | (list | tuple)))) = None,
sock: socket | None = None,
workers: int = 1,
protocol: Optional[Type[Protocol]] = None,
protocol: type[Protocol] | None = None,
backlog: int = 100,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
loop: Optional[AbstractEventLoop] = None,
reload_dir: Optional[Union[List[str], str]] = None,
noisy_exceptions: Optional[bool] = None,
access_log: bool | None = None,
unix: str | None = None,
loop: AbstractEventLoop | None = None,
reload_dir: list[str] | str | None = None,
noisy_exceptions: bool | None = None,
motd: bool = True,
fast: bool = False,
verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None,
motd_display: dict[str, str] | None = None,
auto_tls: bool = False,
single_process: bool = False,
) -> None:
@ -289,28 +280,28 @@ class StartupMixin(metaclass=SanicMeta):
def prepare(
self,
host: Optional[str] = None,
port: Optional[int] = None,
host: str | None = None,
port: int | None = None,
*,
dev: bool = False,
debug: bool = False,
auto_reload: Optional[bool] = None,
auto_reload: bool | None = None,
version: HTTPVersion = HTTP.VERSION_1,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
ssl: None | (SSLContext | (dict | (str | (list | tuple)))) = None,
sock: socket | None = None,
workers: int = 1,
protocol: Optional[Type[Protocol]] = None,
protocol: type[Protocol] | None = None,
backlog: int = 100,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
loop: Optional[AbstractEventLoop] = None,
reload_dir: Optional[Union[List[str], str]] = None,
noisy_exceptions: Optional[bool] = None,
access_log: bool | None = None,
unix: str | None = None,
loop: AbstractEventLoop | None = None,
reload_dir: list[str] | str | None = None,
noisy_exceptions: bool | None = None,
motd: bool = True,
fast: bool = False,
verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None,
motd_display: dict[str, str] | None = None,
coffee: bool = False,
auto_tls: bool = False,
single_process: bool = False,
@ -471,20 +462,20 @@ class StartupMixin(metaclass=SanicMeta):
async def create_server(
self,
host: Optional[str] = None,
port: Optional[int] = None,
host: str | None = None,
port: int | None = None,
*,
debug: bool = False,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
protocol: Optional[Type[Protocol]] = None,
ssl: None | (SSLContext | (dict | (str | (list | tuple)))) = None,
sock: socket | None = None,
protocol: type[Protocol] | None = None,
backlog: int = 100,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
access_log: bool | None = None,
unix: str | None = None,
return_asyncio_server: bool = True,
asyncio_server_kwargs: Optional[Dict[str, Any]] = None,
noisy_exceptions: Optional[bool] = None,
) -> Optional[AsyncioServer]:
asyncio_server_kwargs: dict[str, Any] | None = None,
noisy_exceptions: bool | None = None,
) -> AsyncioServer | None:
"""
Low level API for creating a Sanic Server instance.
@ -637,21 +628,21 @@ class StartupMixin(metaclass=SanicMeta):
def _helper(
self,
host: Optional[str] = None,
port: Optional[int] = None,
host: str | None = None,
port: int | None = None,
debug: bool = False,
version: HTTPVersion = HTTP.VERSION_1,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
unix: Optional[str] = None,
ssl: None | (SSLContext | (dict | (str | (list | tuple)))) = None,
sock: socket | None = None,
unix: str | None = None,
workers: int = 1,
loop: Optional[AbstractEventLoop] = None,
protocol: Type[Protocol] = HttpProtocol,
loop: AbstractEventLoop | None = None,
protocol: type[Protocol] = HttpProtocol,
backlog: int = 100,
register_sys_signals: bool = True,
run_async: bool = False,
auto_tls: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Helper function used by `run` and `create_server`."""
if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0:
raise ValueError(
@ -726,7 +717,7 @@ class StartupMixin(metaclass=SanicMeta):
def motd(
self,
server_settings: Optional[Dict[str, Any]] = None,
server_settings: dict[str, Any] | None = None,
) -> None:
"""Outputs the message of the day (MOTD).
@ -755,8 +746,8 @@ class StartupMixin(metaclass=SanicMeta):
MOTD.output(logo, serve_location, display, extra)
def get_motd_data(
self, server_settings: Optional[Dict[str, Any]] = None
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
self, server_settings: dict[str, Any] | None = None
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Retrieves the message of the day (MOTD) data.
Args:
@ -845,7 +836,7 @@ class StartupMixin(metaclass=SanicMeta):
@staticmethod
def get_server_location(
server_settings: Optional[Dict[str, Any]] = None
server_settings: dict[str, Any] | None = None
) -> str:
"""Using the server settings, retrieve the server location.
@ -880,11 +871,11 @@ class StartupMixin(metaclass=SanicMeta):
@staticmethod
def get_address(
host: Optional[str],
port: Optional[int],
host: str | None,
port: int | None,
version: HTTPVersion = HTTP.VERSION_1,
auto_tls: bool = False,
) -> Tuple[str, int]:
) -> tuple[str, int]:
"""Retrieve the host address and port, with default values based on the given parameters.
Args:
@ -942,10 +933,10 @@ class StartupMixin(metaclass=SanicMeta):
@classmethod
def serve(
cls,
primary: Optional[Sanic] = None,
primary: Sanic | None = None,
*,
app_loader: Optional[AppLoader] = None,
factory: Optional[Callable[[], Sanic]] = None,
app_loader: AppLoader | None = None,
factory: Callable[[], Sanic] | None = None,
) -> None:
"""Serve one or more Sanic applications.
@ -1040,7 +1031,7 @@ class StartupMixin(metaclass=SanicMeta):
primary_server_info.settings["run_multiple"] = True
monitor_sub, monitor_pub = Pipe(True)
worker_state: Mapping[str, Any] = sync_manager.dict()
kwargs: Dict[str, Any] = {
kwargs: dict[str, Any] = {
**primary_server_info.settings,
"monitor_publisher": monitor_pub,
"worker_state": worker_state,
@ -1092,7 +1083,7 @@ class StartupMixin(metaclass=SanicMeta):
worker_state,
)
if cls.should_auto_reload():
reload_dirs: Set[Path] = primary.state.reload_dirs.union(
reload_dirs: set[Path] = primary.state.reload_dirs.union(
*(app.state.reload_dirs for app in apps)
)
reloader = Reloader(monitor_pub, 0, reload_dirs, app_loader)
@ -1164,7 +1155,7 @@ class StartupMixin(metaclass=SanicMeta):
os._exit(exit_code)
@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
def serve_single(cls, primary: Sanic | None = None) -> None:
"""Serve a single process of a Sanic application.
Similar to `serve`, but only serves a single process. When used,
@ -1263,7 +1254,7 @@ class StartupMixin(metaclass=SanicMeta):
self,
primary: Sanic,
_,
apps: List[Sanic],
apps: list[Sanic],
) -> None:
for app in apps:
if (
@ -1308,7 +1299,7 @@ class StartupMixin(metaclass=SanicMeta):
if not server_info.settings["loop"]:
server_info.settings["loop"] = get_running_loop()
serve_args: Dict[str, Any] = {
serve_args: dict[str, Any] = {
**server_info.settings,
"run_async": True,
"reuse_port": bool(primary.state.workers - 1),

View File

@ -2,15 +2,14 @@ from __future__ import annotations
from base64 import b64decode
from dataclasses import dataclass, field
from typing import Optional
@dataclass()
class Credentials:
auth_type: Optional[str]
token: Optional[str]
_username: Optional[str] = field(default=None)
_password: Optional[str] = field(default=None)
auth_type: str | None
token: str | None
_username: str | None = field(default=None)
_password: str | None = field(default=None)
def __post_init__(self):
if self._auth_is_basic:

View File

@ -1,9 +1,7 @@
from __future__ import annotations
import sys
from asyncio import BaseTransport
from typing import TYPE_CHECKING, Any, AnyStr, Optional
from typing import TYPE_CHECKING, AnyStr
if TYPE_CHECKING:
@ -11,25 +9,21 @@ if TYPE_CHECKING:
from sanic.models.asgi import ASGIScope
if sys.version_info < (3, 8):
Range = Any
HTMLProtocol = Any
else:
# Protocol is a 3.8+ feature
from typing import Protocol
from typing import Protocol
class HTMLProtocol(Protocol):
class HTMLProtocol(Protocol):
def __html__(self) -> AnyStr:
...
def _repr_html_(self) -> AnyStr:
...
class Range(Protocol):
start: Optional[int]
end: Optional[int]
size: Optional[int]
total: Optional[int]
class Range(Protocol):
start: int | None
end: int | None
size: int | None
total: int | None
__slots__ = ()

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from ssl import SSLContext, SSLObject
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Tuple
from typing import Any
from sanic.models.protocol_types import TransportProtocol
@ -35,17 +35,17 @@ class ConnInfo:
def __init__(self, transport: TransportProtocol, unix=None):
self.ctx = SimpleNamespace()
self.lost = False
self.peername: Optional[Tuple[str, int]] = None
self.peername: tuple[str, int] | None = None
self.server = self.client = ""
self.server_port = self.client_port = 0
self.client_ip = ""
self.sockname = addr = transport.get_extra_info("sockname")
self.ssl = False
self.server_name = ""
self.cert: Dict[str, Any] = {}
self.network_paths: List[Any] = []
sslobj: Optional[SSLObject] = transport.get_extra_info("ssl_object") # type: ignore
sslctx: Optional[SSLContext] = transport.get_extra_info("ssl_context") # type: ignore
self.cert: dict[str, Any] = {}
self.network_paths: list[Any] = []
sslobj: SSLObject | None = transport.get_extra_info("ssl_object") # type: ignore
sslctx: SSLContext | None = transport.get_extra_info("ssl_context") # type: ignore
if sslobj:
self.ssl = True
self.server_name = getattr(sslobj, "sanic_server_name", None) or ""

View File

@ -1,19 +1,12 @@
import sys
from typing import Dict, Iterable
from typing import Iterable, TypedDict
from html5tagger import E
from .base import BasePage
if sys.version_info < (3, 8): # no cov
FileInfo = Dict
else:
from typing import TypedDict
class FileInfo(TypedDict):
class FileInfo(TypedDict):
"""Type for file info."""
icon: str

View File

@ -1,12 +1,12 @@
from __future__ import annotations
from typing import Any, Optional
from typing import Any
class RequestParameters(dict):
"""Hosts a dict with lists as values where get returns the first value of the list and getlist returns the whole shebang""" # noqa: E501
def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
def get(self, name: str, default: Any | None = None) -> Any | None:
"""Return the first value, either the default or actual
Args:
@ -19,8 +19,8 @@ class RequestParameters(dict):
return super().get(name, [default])[0]
def getlist(
self, name: str, default: Optional[Any] = None
) -> Optional[Any]:
self, name: str, default: Any | None = None
) -> Any | None:
"""Return the entire list
Args:

View File

@ -7,13 +7,7 @@ from types import SimpleNamespace
from typing import (
TYPE_CHECKING,
Any,
DefaultDict,
Dict,
Generic,
List,
Optional,
Tuple,
Union,
cast,
)
@ -161,8 +155,8 @@ class Request(Generic[sanic_type, ctx_type]):
except HttpParserInvalidURLError:
url = url_bytes.decode(errors="backslashreplace")
raise BadURL(f"Bad URL: {url}")
self._id: Optional[Union[uuid.UUID, str, int]] = None
self._name: Optional[str] = None
self._id: uuid.UUID | (str | int) | None = None
self._name: str | None = None
self._stream_id = stream_id
self.app = app
@ -174,29 +168,29 @@ class Request(Generic[sanic_type, ctx_type]):
# Init but do not inhale
self.body = b""
self.conn_info: Optional[ConnInfo] = None
self._ctx: Optional[ctx_type] = None
self.parsed_accept: Optional[AcceptList] = None
self.parsed_args: DefaultDict[
Tuple[bool, bool, str, str], RequestParameters
self.conn_info: ConnInfo | None = None
self._ctx: ctx_type | None = None
self.parsed_accept: AcceptList | None = None
self.parsed_args: defaultdict[
tuple[bool, bool, str, str], RequestParameters
] = defaultdict(RequestParameters)
self.parsed_cookies: Optional[RequestParameters] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_files: Optional[RequestParameters] = None
self.parsed_form: Optional[RequestParameters] = None
self.parsed_forwarded: Optional[Options] = None
self.parsed_cookies: RequestParameters | None = None
self.parsed_credentials: Credentials | None = None
self.parsed_files: RequestParameters | None = None
self.parsed_form: RequestParameters | None = None
self.parsed_forwarded: Options | None = None
self.parsed_json = None
self.parsed_not_grouped_args: DefaultDict[
Tuple[bool, bool, str, str], List[Tuple[str, str]]
self.parsed_not_grouped_args: defaultdict[
tuple[bool, bool, str, str], list[tuple[str, str]]
] = defaultdict(list)
self.parsed_token: Optional[str] = None
self.parsed_token: str | None = None
self._request_middleware_started = False
self._response_middleware_started = False
self.responded: bool = False
self.route: Optional[Route] = None
self.stream: Optional[Stream] = None
self._match_info: Dict[str, Any] = {}
self._protocol: Optional[BaseProtocol] = None
self.route: Route | None = None
self.stream: Stream | None = None
self._match_info: dict[str, Any] = {}
self._protocol: BaseProtocol | None = None
def __repr__(self):
class_name = self.__class__.__name__
@ -251,7 +245,7 @@ class Request(Generic[sanic_type, ctx_type]):
return request
@classmethod
def generate_id(*_) -> Union[uuid.UUID, str, int]:
def generate_id(*_) -> uuid.UUID | (str | int):
"""Generate a unique ID for the request.
This method is called to generate a unique ID for each request.
@ -320,11 +314,11 @@ class Request(Generic[sanic_type, ctx_type]):
async def respond(
self,
response: Optional[BaseHTTPResponse] = None,
response: BaseHTTPResponse | None = None,
*,
status: int = 200,
headers: Optional[Union[Header, Dict[str, str]]] = None,
content_type: Optional[str] = None,
headers: Header | dict[str, str] | None = None,
content_type: str | None = None,
):
"""Respond to the request without returning.
@ -424,7 +418,7 @@ class Request(Generic[sanic_type, ctx_type]):
self.body = b"".join([data async for data in self.stream])
@property
def name(self) -> Optional[str]:
def name(self) -> str | None:
"""The route name
In the following pattern:
@ -443,7 +437,7 @@ class Request(Generic[sanic_type, ctx_type]):
return None
@property
def endpoint(self) -> Optional[str]:
def endpoint(self) -> str | None:
"""Alias of `sanic.request.Request.name`
Returns:
@ -452,7 +446,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.name
@property
def uri_template(self) -> Optional[str]:
def uri_template(self) -> str | None:
"""The defined URI template
Returns:
@ -494,7 +488,7 @@ class Request(Generic[sanic_type, ctx_type]):
return bytes(reqline)
@property
def id(self) -> Optional[Union[uuid.UUID, str, int]]:
def id(self) -> uuid.UUID | (str | int) | None:
"""A request ID passed from the client, or generated from the backend.
By default, this will look in a request header defined at:
@ -593,7 +587,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.parsed_accept
@property
def token(self) -> Optional[str]:
def token(self) -> str | None:
"""Attempt to return the auth header token.
Returns:
@ -608,7 +602,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.parsed_token
@property
def credentials(self) -> Optional[Credentials]:
def credentials(self) -> Credentials | None:
"""Attempt to return the auth header value.
Covers NoAuth, Basic Auth, Bearer Token, Api Token authentication
@ -633,7 +627,7 @@ class Request(Generic[sanic_type, ctx_type]):
def get_form(
self, keep_blank_values: bool = False
) -> Optional[RequestParameters]:
) -> RequestParameters | None:
"""Method to extract and parse the form data from a request.
Args:
@ -670,7 +664,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.parsed_form
@property
def form(self) -> Optional[RequestParameters]:
def form(self) -> RequestParameters | None:
"""The request body parsed as form data
Returns:
@ -682,7 +676,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.parsed_form
@property
def files(self) -> Optional[RequestParameters]:
def files(self) -> RequestParameters | None:
"""The request body parsed as uploaded files
Returns:
@ -836,7 +830,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.headers.getone("content-type", DEFAULT_HTTP_CONTENT_TYPE)
@property
def match_info(self) -> Dict[str, Any]:
def match_info(self) -> dict[str, Any]:
"""Matched path parameters after resolving route
Returns:
@ -867,7 +861,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self.conn_info.client_port if self.conn_info else 0
@property
def socket(self) -> Union[Tuple[str, int], Tuple[None, None]]:
def socket(self) -> tuple[str, int] | tuple[None, None]:
"""Information about the connected socket if available
Returns:
@ -891,7 +885,7 @@ class Request(Generic[sanic_type, ctx_type]):
return self._parsed_url.path.decode("utf-8")
@property
def network_paths(self) -> Optional[List[Any]]:
def network_paths(self) -> list[Any] | None:
"""Access the network paths if available
Returns:

View File

@ -6,7 +6,7 @@ from mimetypes import guess_type
from os import path
from pathlib import PurePath
from time import time
from typing import Any, AnyStr, Callable, Dict, Optional, Union
from typing import Any, AnyStr, Callable
from urllib.parse import quote_plus
from sanic.compat import Header, open_async, stat_async
@ -19,7 +19,7 @@ from .types import HTTPResponse, JSONResponse, ResponseStream
def empty(
status: int = 204, headers: Optional[Dict[str, str]] = None
status: int = 204, headers: dict[str, str] | None = None
) -> HTTPResponse:
"""Returns an empty response to the client.
@ -36,9 +36,9 @@ def empty(
def json(
body: Any,
status: int = 200,
headers: Optional[Dict[str, str]] = None,
headers: dict[str, str] | None = None,
content_type: str = "application/json",
dumps: Optional[Callable[..., str]] = None,
dumps: Callable[..., str] | None = None,
**kwargs: Any,
) -> JSONResponse:
"""Returns response object with body in json format.
@ -67,7 +67,7 @@ def json(
def text(
body: str,
status: int = 200,
headers: Optional[Dict[str, str]] = None,
headers: dict[str, str] | None = None,
content_type: str = "text/plain; charset=utf-8",
) -> HTTPResponse:
"""Returns response object with body in text format.
@ -95,9 +95,9 @@ def text(
def raw(
body: Optional[AnyStr],
body: AnyStr | None,
status: int = 200,
headers: Optional[Dict[str, str]] = None,
headers: dict[str, str] | None = None,
content_type: str = DEFAULT_HTTP_CONTENT_TYPE,
) -> HTTPResponse:
"""Returns response object without encoding the body.
@ -120,9 +120,9 @@ def raw(
def html(
body: Union[str, bytes, HTMLProtocol],
body: str | (bytes | HTMLProtocol),
status: int = 200,
headers: Optional[Dict[str, str]] = None,
headers: dict[str, str] | None = None,
) -> HTTPResponse:
"""Returns response object with body in html format.
@ -151,8 +151,8 @@ def html(
async def validate_file(
request_headers: Header, last_modified: Union[datetime, float, int]
) -> Optional[HTTPResponse]:
request_headers: Header, last_modified: datetime | (float | int)
) -> HTTPResponse | None:
"""Validate file based on request headers.
Args:
@ -204,17 +204,17 @@ async def validate_file(
async def file(
location: Union[str, PurePath],
location: str | PurePath,
status: int = 200,
request_headers: Optional[Header] = None,
request_headers: Header | None = None,
validate_when_requested: bool = True,
mime_type: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
filename: Optional[str] = None,
last_modified: Optional[Union[datetime, float, int, Default]] = _default,
max_age: Optional[Union[float, int]] = None,
no_store: Optional[bool] = None,
_range: Optional[Range] = None,
mime_type: str | None = None,
headers: dict[str, str] | None = None,
filename: str | None = None,
last_modified: datetime | (float | (int | Default)) | None = _default,
max_age: float | int | None = None,
no_store: bool | None = None,
_range: Range | None = None,
) -> HTTPResponse:
"""Return a response object with file data.
@ -301,7 +301,7 @@ async def file(
def redirect(
to: str,
headers: Optional[Dict[str, str]] = None,
headers: dict[str, str] | None = None,
status: int = 302,
content_type: str = "text/html; charset=utf-8",
) -> HTTPResponse:
@ -330,13 +330,13 @@ def redirect(
async def file_stream(
location: Union[str, PurePath],
location: str | PurePath,
status: int = 200,
chunk_size: int = 4096,
mime_type: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
filename: Optional[str] = None,
_range: Optional[Range] = None,
mime_type: str | None = None,
headers: dict[str, str] | None = None,
filename: str | None = None,
_range: Range | None = None,
) -> ResponseStream:
"""Return a streaming response object with file data.

View File

@ -8,12 +8,8 @@ from typing import (
AnyStr,
Callable,
Coroutine,
Dict,
Iterator,
Optional,
Tuple,
TypeVar,
Union,
)
from sanic.compat import Header
@ -66,18 +62,18 @@ class BaseHTTPResponse:
def __init__(self):
self.asgi: bool = False
self.body: Optional[bytes] = None
self.content_type: Optional[str] = None
self.stream: Optional[Union[Http, ASGIApp, HTTPReceiver]] = None
self.body: bytes | None = None
self.content_type: str | None = None
self.stream: Http | (ASGIApp | HTTPReceiver) | None = None
self.status: int = None
self.headers = Header({})
self._cookies: Optional[CookieJar] = None
self._cookies: CookieJar | None = None
def __repr__(self):
class_name = self.__class__.__name__
return f"<{class_name}: {self.status} {self.content_type}>"
def _encode_body(self, data: Optional[AnyStr]):
def _encode_body(self, data: AnyStr | None):
if data is None:
return b""
return (
@ -98,7 +94,7 @@ class BaseHTTPResponse:
return self._cookies
@property
def processed_headers(self) -> Iterator[Tuple[bytes, bytes]]:
def processed_headers(self) -> Iterator[tuple[bytes, bytes]]:
"""Obtain a list of header tuples encoded in bytes for sending.
Add and remove headers based on status and content_type.
@ -119,8 +115,8 @@ class BaseHTTPResponse:
async def send(
self,
data: Optional[AnyStr] = None,
end_stream: Optional[bool] = None,
data: AnyStr | None = None,
end_stream: bool | None = None,
) -> None:
"""Send any pending response headers and the given data as body.
@ -157,14 +153,14 @@ class BaseHTTPResponse:
value: str,
*,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
secure: bool = True,
max_age: Optional[int] = None,
expires: Optional[datetime] = None,
max_age: int | None = None,
expires: datetime | None = None,
httponly: bool = False,
samesite: Optional[SameSite] = "Lax",
samesite: SameSite | None = "Lax",
partitioned: bool = False,
comment: Optional[str] = None,
comment: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> Cookie:
@ -211,7 +207,7 @@ class BaseHTTPResponse:
key: str,
*,
path: str = "/",
domain: Optional[str] = None,
domain: str | None = None,
host_prefix: bool = False,
secure_prefix: bool = False,
) -> None:
@ -255,14 +251,14 @@ class HTTPResponse(BaseHTTPResponse):
def __init__(
self,
body: Optional[Any] = None,
body: Any | None = None,
status: int = 200,
headers: Optional[Union[Header, Dict[str, str]]] = None,
content_type: Optional[str] = None,
headers: Header | dict[str, str] | None = None,
content_type: str | None = None,
):
super().__init__()
self.content_type: Optional[str] = content_type
self.content_type: str | None = content_type
self.body = self._encode_body(body)
self.status = status
self.headers = Header(headers or {})
@ -306,11 +302,11 @@ class JSONResponse(HTTPResponse):
def __init__(
self,
body: Optional[Any] = None,
body: Any | None = None,
status: int = 200,
headers: Optional[Union[Header, Dict[str, str]]] = None,
headers: Header | dict[str, str] | None = None,
content_type: str = "application/json",
dumps: Optional[Callable[..., str]] = None,
dumps: Callable[..., str] | None = None,
**kwargs: Any,
):
self._initialized = False
@ -337,7 +333,7 @@ class JSONResponse(HTTPResponse):
)
@property
def raw_body(self) -> Optional[Any]:
def raw_body(self) -> Any | None:
"""Returns the raw body, as long as body has not been manually set previously.
NOTE: This object should not be mutated, as it will not be
@ -361,7 +357,7 @@ class JSONResponse(HTTPResponse):
self._raw_body = value
@property # type: ignore
def body(self) -> Optional[bytes]: # type: ignore
def body(self) -> bytes | None: # type: ignore
"""Returns the response body.
Returns:
@ -370,7 +366,7 @@ class JSONResponse(HTTPResponse):
return self._body
@body.setter
def body(self, value: Optional[bytes]):
def body(self, value: bytes | None):
self._body = value
if not self._initialized:
return
@ -379,7 +375,7 @@ class JSONResponse(HTTPResponse):
def set_body(
self,
body: Any,
dumps: Optional[Callable[..., str]] = None,
dumps: Callable[..., str] | None = None,
**dumps_kwargs: Any,
) -> None:
"""Set the response body to the given value, using the given dumps function
@ -526,12 +522,12 @@ class ResponseStream:
def __init__(
self,
streaming_fn: Callable[
[Union[BaseHTTPResponse, ResponseStream]],
[BaseHTTPResponse | ResponseStream],
Coroutine[Any, Any, None],
],
status: int = 200,
headers: Optional[Union[Header, Dict[str, str]]] = None,
content_type: Optional[str] = None,
headers: Header | dict[str, str] | None = None,
content_type: str | None = None,
):
if headers is None:
headers = Header()
@ -541,8 +537,8 @@ class ResponseStream:
self.status = status
self.headers = headers or Header()
self.content_type = content_type
self.request: Optional[Request] = None
self._cookies: Optional[CookieJar] = None
self.request: Request | None = None
self._cookies: CookieJar | None = None
async def write(self, message: str):
await self.response.send(message)

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from functools import lru_cache
from inspect import signature
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Iterable
from uuid import UUID
from sanic_routing import BaseRouter
@ -27,8 +27,8 @@ class Router(BaseRouter):
ALLOWED_METHODS = HTTP_METHODS
def _get(
self, path: str, method: str, host: Optional[str]
) -> Tuple[Route, RouteHandler, Dict[str, Any]]:
self, path: str, method: str, host: str | None
) -> tuple[Route, RouteHandler, dict[str, Any]]:
try:
return self.resolve(
path=path,
@ -48,8 +48,8 @@ class Router(BaseRouter):
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def get( # type: ignore
self, path: str, method: str, host: Optional[str]
) -> Tuple[Route, RouteHandler, Dict[str, Any]]:
self, path: str, method: str, host: str | None
) -> tuple[Route, RouteHandler, dict[str, Any]]:
"""Retrieve a `Route` object containing the details about how to handle a response for a given request
:param request: the incoming request object
@ -78,18 +78,18 @@ class Router(BaseRouter):
uri: str,
methods: Iterable[str],
handler: RouteHandler,
host: Optional[Union[str, Iterable[str]]] = None,
host: str | Iterable[str] | None = None,
strict_slashes: bool = False,
stream: bool = False,
ignore_body: bool = False,
version: Optional[Union[str, float, int]] = None,
name: Optional[str] = None,
version: str | (float | int) | None = None,
name: str | None = None,
unquote: bool = False,
static: bool = False,
version_prefix: str = "/v",
overwrite: bool = False,
error_format: Optional[str] = None,
) -> Union[Route, List[Route]]:
error_format: str | None = None,
) -> Route | list[Route]:
"""Add a handler to the router
Args:
@ -115,15 +115,15 @@ class Router(BaseRouter):
uri = self._normalize(uri, handler)
params = dict(
path=uri,
handler=handler,
methods=frozenset(map(str, methods)) if methods else None,
name=name,
strict=strict_slashes,
unquote=unquote,
overwrite=overwrite,
)
params = {
"path": uri,
"handler": handler,
"methods": frozenset(map(str, methods)) if methods else None,
"name": name,
"strict": strict_slashes,
"unquote": unquote,
"overwrite": overwrite,
}
if isinstance(host, str):
hosts = [host]
@ -163,8 +163,8 @@ class Router(BaseRouter):
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def find_route_by_view_name(
self, view_name: str, name: Optional[str] = None
) -> Optional[Route]:
self, view_name: str, name: str | None = None
) -> Route | None:
"""Find a route in the router based on the specified view name.
Args:
@ -188,7 +188,7 @@ class Router(BaseRouter):
return route
@property
def routes_all(self) -> Dict[Tuple[str, ...], Route]:
def routes_all(self) -> dict[tuple[str, ...], Route]:
"""Return all routes in the router.
Returns:
@ -197,7 +197,7 @@ class Router(BaseRouter):
return {route.parts: route for route in self.routes}
@property
def routes_static(self) -> Dict[Tuple[str, ...], Route]:
def routes_static(self) -> dict[tuple[str, ...], Route]:
"""Return all static routes in the router.
_In this context "static" routes do not refer to the `app.static()`
@ -210,7 +210,7 @@ class Router(BaseRouter):
return self.static_routes
@property
def routes_dynamic(self) -> Dict[Tuple[str, ...], Route]:
def routes_dynamic(self) -> dict[tuple[str, ...], Route]:
"""Return all dynamic routes in the router.
_Dynamic routes are routes that contain path parameters._
@ -221,7 +221,7 @@ class Router(BaseRouter):
return self.dynamic_routes
@property
def routes_regex(self) -> Dict[Tuple[str, ...], Route]:
def routes_regex(self) -> dict[tuple[str, ...], Route]:
"""Return all regex routes in the router.
_Regex routes are routes that contain path parameters with regex

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from inspect import isawaitable
from typing import TYPE_CHECKING, Any, Callable, Iterable, Optional
from typing import TYPE_CHECKING, Any, Callable, Iterable
if TYPE_CHECKING:
@ -9,9 +9,9 @@ if TYPE_CHECKING:
def trigger_events(
events: Optional[Iterable[Callable[..., Any]]],
events: Iterable[Callable[..., Any]] | None,
loop,
app: Optional[Sanic] = None,
app: Sanic | None = None,
):
"""Trigger event callbacks (functions or async)

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from sanic.exceptions import RequestCancelled
@ -47,9 +47,9 @@ class SanicProtocol(asyncio.Protocol):
self.loop = loop
self.app: Sanic = app
self.signal = signal or Signal()
self.transport: Optional[Transport] = None
self.transport: Transport | None = None
self.connections = connections if connections is not None else set()
self.conn_info: Optional[ConnInfo] = None
self.conn_info: ConnInfo | None = None
self._can_write = asyncio.Event()
self._can_write.set()
self._unix = unix
@ -82,7 +82,7 @@ class SanicProtocol(asyncio.Protocol):
self._data_received.clear()
await self._data_received.wait()
def close(self, timeout: Optional[float] = None):
def close(self, timeout: float | None = None):
"""
Attempt close the connection.
"""

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from sanic.http.constants import HTTP
from sanic.http.http3 import Http3
@ -55,7 +55,7 @@ class HttpProtocolMixin:
...
def _setup(self):
self.request: Optional[Request] = None
self.request: Request | None = None
self.access_log = self.app.config.ACCESS_LOG
self.request_handler = self.app.handle_request
self.error_handler = self.app.error_handler
@ -295,7 +295,7 @@ class Http3Protocol(HttpProtocolMixin, ConnectionProtocol): # type: ignore
self.app = app
super().__init__(*args, **kwargs)
self._setup()
self._connection: Optional[H3Connection] = None
self._connection: H3Connection | None = None
def quic_event_received(self, event: QuicEvent) -> None:
logger.debug(
@ -319,5 +319,5 @@ class Http3Protocol(HttpProtocolMixin, ConnectionProtocol): # type: ignore
self._http.http_event_received(http_event)
@property
def connection(self) -> Optional[H3Connection]:
def connection(self) -> H3Connection | None:
return self._connection

View File

@ -104,12 +104,10 @@ class WebSocketProtocol(HttpProtocol):
# but ServerProtocol needs a list
subprotocols = cast(
Optional[Sequence[Subprotocol]],
list(
[
Subprotocol(subprotocol)
for subprotocol in subprotocols
]
),
],
)
ws_proto = ServerProtocol(
max_size=self.websocket_max_size,

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from ssl import SSLContext
from typing import TYPE_CHECKING, Dict, Optional, Type, Union
from typing import TYPE_CHECKING
from sanic.config import Config
from sanic.exceptions import ServerError
@ -42,12 +42,12 @@ def serve(
host,
port,
app: Sanic,
ssl: Optional[SSLContext] = None,
sock: Optional[socket.socket] = None,
unix: Optional[str] = None,
ssl: SSLContext | None = None,
sock: socket.socket | None = None,
unix: str | None = None,
reuse_port: bool = False,
loop=None,
protocol: Type[asyncio.Protocol] = HttpProtocol,
protocol: type[asyncio.Protocol] = HttpProtocol,
backlog: int = 100,
register_sys_signals: bool = True,
run_multiple: bool = False,
@ -348,8 +348,8 @@ def _serve_http_3(
def _build_protocol_kwargs(
protocol: Type[asyncio.Protocol], config: Config
) -> Dict[str, Union[int, float]]:
protocol: type[asyncio.Protocol], config: Config
) -> dict[str, int | float]:
if hasattr(protocol, "websocket_handshake"):
return {
"websocket_max_size": config.WEBSOCKET_MAX_SIZE,

View File

@ -6,7 +6,7 @@ import socket
import stat
from ipaddress import ip_address
from typing import Any, Dict, Optional
from typing import Any
from sanic.exceptions import ServerError
from sanic.http.constants import HTTP
@ -77,7 +77,7 @@ def bind_unix_socket(path: str, *, mode=0o666, backlog=100) -> socket.socket:
return sock
def remove_unix_socket(path: Optional[str]) -> None:
def remove_unix_socket(path: str | None) -> None:
"""Remove dead unix socket during server exit."""
if not path:
return
@ -94,8 +94,8 @@ def remove_unix_socket(path: Optional[str]) -> None:
def configure_socket(
server_settings: Dict[str, Any]
) -> Optional[socket.SocketType]:
server_settings: dict[str, Any]
) -> socket.SocketType | None:
# Create a listening socket or use the one in settings
if server_settings.get("version") is HTTP.VERSION_3:
return None

View File

@ -4,7 +4,7 @@ import asyncio
from enum import Enum
from inspect import isawaitable
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from typing import Any, cast
from sanic_routing import BaseRouter, Route, RouteGroup
from sanic_routing.exceptions import NotFound
@ -96,7 +96,7 @@ class SignalRouter(BaseRouter):
def get( # type: ignore
self,
event: str,
condition: Optional[Dict[str, str]] = None,
condition: dict[str, str] | None = None,
):
"""Get the handlers for a signal
@ -121,7 +121,7 @@ class SignalRouter(BaseRouter):
)
except NotFound:
message = "Could not find signal %s"
terms: List[Union[str, Optional[Dict[str, str]]]] = [event]
terms: list[str | dict[str, str] | None] = [event]
if extra:
message += " with %s"
terms.append(extra)
@ -144,8 +144,8 @@ class SignalRouter(BaseRouter):
async def _dispatch(
self,
event: str,
context: Optional[Dict[str, Any]] = None,
condition: Optional[Dict[str, str]] = None,
context: dict[str, Any] | None = None,
condition: dict[str, str] | None = None,
fail_not_found: bool = True,
reverse: bool = False,
) -> Any:
@ -205,12 +205,12 @@ class SignalRouter(BaseRouter):
self,
event: str,
*,
context: Optional[Dict[str, Any]] = None,
condition: Optional[Dict[str, str]] = None,
context: dict[str, Any] | None = None,
condition: dict[str, str] | None = None,
fail_not_found: bool = True,
inline: bool = False,
reverse: bool = False,
) -> Union[asyncio.Task, Any]:
) -> asyncio.Task | Any:
"""Dispatch a signal to all handlers that match the event
Args:
@ -248,7 +248,7 @@ class SignalRouter(BaseRouter):
self,
handler: SignalHandler,
event: str,
condition: Optional[Dict[str, Any]] = None,
condition: dict[str, Any] | None = None,
exclusive: bool = True,
) -> Signal:
event_definition = event
@ -302,7 +302,7 @@ class SignalRouter(BaseRouter):
return super().finalize(do_compile=do_compile, do_optimize=do_optimize)
def _build_event_parts(self, event: str) -> Tuple[str, str, str]:
def _build_event_parts(self, event: str) -> tuple[str, str, str]:
parts = path_to_parts(event, self.delimiter)
if (
len(parts) != 3

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from ast import Assign, Constant, NodeTransformer, Subscript
from typing import TYPE_CHECKING, Any, List
from typing import TYPE_CHECKING, Any
from sanic.http.constants import HTTP
@ -15,7 +15,7 @@ if TYPE_CHECKING:
class AltSvcCheck(BaseScheme):
ident = "ALTSVC"
def visitors(self) -> List[NodeTransformer]:
def visitors(self) -> list[NodeTransformer]:
return [RemoveAltSvc(self.app, self.app.state.verbosity)]

View File

@ -117,7 +117,7 @@ def load_module_from_file_location(
compile(config_file.read(), location, "exec"),
module.__dict__,
)
except IOError as e:
except OSError as e:
e.strerror = "Unable to load configuration file (e.strerror)"
raise
except Exception as e:
@ -128,4 +128,4 @@ def load_module_from_file_location(
try:
return import_string(location)
except ValueError:
raise IOError("Unable to load configuration %s" % str(location))
raise OSError("Unable to load configuration %s" % str(location))

View File

@ -5,9 +5,6 @@ from typing import (
Any,
Callable,
Iterable,
List,
Optional,
Union,
)
from sanic.models.handler_types import RouteHandler
@ -115,19 +112,19 @@ class HTTPMethodView:
to `"/v"`.
"""
get: Optional[Callable[..., Any]]
get: Callable[..., Any] | None
decorators: List[Callable[[Callable[..., Any]], Callable[..., Any]]] = []
decorators: list[Callable[[Callable[..., Any]], Callable[..., Any]]] = []
def __init_subclass__(
cls,
attach: Optional[Union[Sanic, Blueprint]] = None,
attach: Sanic | Blueprint | None = None,
uri: str = "",
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
host: str | None = None,
strict_slashes: bool | None = None,
version: int | None = None,
name: str | None = None,
stream: bool = False,
version_prefix: str = "/v",
) -> None:
@ -203,13 +200,13 @@ class HTTPMethodView:
@classmethod
def attach(
cls,
to: Union[Sanic, Blueprint],
to: Sanic | Blueprint,
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
host: str | None = None,
strict_slashes: bool | None = None,
version: int | None = None,
name: str | None = None,
stream: bool = False,
version_prefix: str = "/v",
) -> None:

View File

@ -5,7 +5,7 @@ from inspect import isawaitable
from multiprocessing.connection import Connection
from os import environ
from pathlib import Path
from typing import Any, Dict, Mapping, Union
from typing import Any, Mapping
from sanic.exceptions import Unauthorized
from sanic.helpers import Default
@ -39,13 +39,13 @@ class Inspector:
def __init__(
self,
publisher: Connection,
app_info: Dict[str, Any],
app_info: dict[str, Any],
worker_state: Mapping[str, Any],
host: str,
port: int,
api_key: str,
tls_key: Union[Path, str, Default],
tls_cert: Union[Path, str, Default],
tls_key: Path | (str | Default),
tls_cert: Path | (str | Default),
):
self._publisher = publisher
self.app_info = app_info
@ -106,13 +106,13 @@ class Inspector:
name = request.match_info.get("action", "info")
return json({"meta": {"action": name}, "result": output})
def _state_to_json(self) -> Dict[str, Any]:
def _state_to_json(self) -> dict[str, Any]:
output = {"info": self.app_info}
output["workers"] = self._make_safe(dict(self.worker_state))
return output
@staticmethod
def _make_safe(obj: Dict[str, Any]) -> Dict[str, Any]:
def _make_safe(obj: dict[str, Any]) -> dict[str, Any]:
for key, value in obj.items():
if isinstance(value, dict):
obj[key] = Inspector._make_safe(value)
@ -132,7 +132,7 @@ class Inspector:
message += ":STARTUP_FIRST"
self._publisher.send(message)
def scale(self, replicas: Union[str, int]) -> str:
def scale(self, replicas: str | int) -> str:
"""Scale the number of workers
Args:

View File

@ -8,7 +8,7 @@ from importlib import import_module
from inspect import isfunction
from pathlib import Path
from ssl import SSLContext
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Callable, cast
from sanic.http.tls.context import process_to_context
from sanic.http.tls.creators import MkcertCreator, TrustmeCreator
@ -41,7 +41,7 @@ class AppLoader:
as_factory: bool = False,
as_simple: bool = False,
args: Any = None,
factory: Optional[Callable[[], SanicApp]] = None,
factory: Callable[[], SanicApp] | None = None,
) -> None:
self.module_input = module_input
self.module_name = ""
@ -134,9 +134,7 @@ class CertLoader:
def __init__(
self,
ssl_data: Optional[
Union[SSLContext, Dict[str, Union[str, os.PathLike]]]
],
ssl_data: SSLContext | dict[str, str | os.PathLike] | None,
):
self._ssl_data = ssl_data
self._creator_class = None

View File

@ -324,15 +324,13 @@ class WorkerManager:
def processes(self):
"""Get all of the processes."""
for worker in self.workers:
for process in worker.processes:
yield process
yield from worker.processes
@property
def transient_processes(self):
"""Get all of the transient processes."""
for worker in self.transient.values():
for process in worker.processes:
yield process
yield from worker.processes
def kill(self):
"""Kill all of the processes."""

View File

@ -10,7 +10,6 @@ from pathlib import Path
from signal import SIGINT, SIGTERM
from signal import signal as signal_func
from time import sleep
from typing import Dict, Set
from sanic.server.events import trigger_events
from sanic.worker.loader import AppLoader
@ -23,7 +22,7 @@ class Reloader:
self,
publisher: Connection,
interval: float,
reload_dirs: Set[Path],
reload_dirs: set[Path],
app_loader: AppLoader,
):
self._publisher = publisher
@ -36,7 +35,7 @@ class Reloader:
app = self.app_loader.load()
signal_func(SIGINT, self.stop)
signal_func(SIGTERM, self.stop)
mtimes: Dict[str, float] = {}
mtimes: dict[str, float] = {}
reloader_start = app.listeners.get("reload_process_start")
reloader_stop = app.listeners.get("reload_process_stop")

View File

@ -104,7 +104,7 @@ def _fetch_current_version(config_file: str) -> str:
def _change_micro_version(current_version: str):
version_string = current_version.split(".")
version_string[-1] = str((int(version_string[-1]) + 1))
version_string[-1] = str(int(version_string[-1]) + 1)
return ".".join(version_string)
@ -231,7 +231,7 @@ def _tag_release(new_version, current_version, milestone, release_name, token):
)
out, error, ret = _run_shell_command(command=command)
if int(ret) != 0:
print("Failed to execute the command: {}".format(command[0]))
print(f"Failed to execute the command: {command[0]}")
sys.exit(1)
change_log = _generate_markdown_document(

View File

@ -14,7 +14,7 @@ class AsyncMock(Mock):
def __call__(self, *args, **kwargs):
self.call_count += 1
parent = super(AsyncMock, self)
parent = super()
async def dummy():
self.await_count += 1

View File

@ -24,7 +24,7 @@ class TestSanicRouteResolution:
router, simple_routes = sanic_router(route_details=simple_routes)
route_to_call = choice(simple_routes)
request = Request(
"/{}".format(route_to_call[-1]).encode(),
f"/{route_to_call[-1]}".encode(),
{"host": "localhost"},
"v1",
route_to_call[0],
@ -58,9 +58,9 @@ class TestSanicRouteResolution:
template=route_to_call[-1]
)
print("{} -> {}".format(route_to_call[-1], url))
print(f"{route_to_call[-1]} -> {url}")
request = Request(
"/{}".format(url).encode(),
f"/{url}".encode(),
{"host": "localhost"},
"v1",
route_to_call[0],

View File

@ -23,10 +23,10 @@ for n in range(6):
setup="from sanic.response import json",
number=100000,
)
print("Took {} seconds".format(time))
print(f"Took {time} seconds")
total_time += time
times += 1
print("Average: {}".format(total_time / times))
print(f"Average: {total_time / times}")
print("Running Old 100,000 times")
times = 0
@ -37,7 +37,7 @@ for n in range(6):
setup="from sanic.response import json",
number=100000,
)
print("Took {} seconds".format(time))
print(f"Took {time} seconds")
total_time += time
times += 1
print("Average: {}".format(total_time / times))
print(f"Average: {total_time / times}")

View File

@ -28,7 +28,7 @@ def test(request):
@app.route("/text/<name>/<butt:int>")
def rtext(request, name, butt):
return text("yeehaww {} {}".format(name, butt))
return text(f"yeehaww {name} {butt}")
@app.route("/exception")

View File

@ -41,7 +41,7 @@ if __name__ == "__main__":
from wsgiref.simple_server import make_server
try:
print("Visit http://localhost:{}/".format(sys.argv[-1]))
print(f"Visit http://localhost:{sys.argv[-1]}/")
make_server("", int(sys.argv[-1]), main).serve_forever()
except KeyboardInterrupt:
pass

View File

@ -51,7 +51,7 @@ def test_asyncio_server_no_start_serving(app: Sanic):
asyncio_srv_coro = app.create_server(
port=43123,
return_asyncio_server=True,
asyncio_server_kwargs=dict(start_serving=False),
asyncio_server_kwargs={"start_serving": False},
)
srv = loop.run_until_complete(asyncio_srv_coro)
assert srv.is_serving() is False
@ -63,7 +63,7 @@ def test_asyncio_server_start_serving(app: Sanic):
asyncio_srv_coro = app.create_server(
port=43124,
return_asyncio_server=True,
asyncio_server_kwargs=dict(start_serving=False),
asyncio_server_kwargs={"start_serving": False},
)
srv = loop.run_until_complete(asyncio_srv_coro)
assert srv.is_serving() is False
@ -96,7 +96,7 @@ def test_create_server_no_startup(app: Sanic):
asyncio_srv_coro = app.create_server(
port=43124,
return_asyncio_server=True,
asyncio_server_kwargs=dict(start_serving=False),
asyncio_server_kwargs={"start_serving": False},
)
srv = loop.run_until_complete(asyncio_srv_coro)
message = (
@ -488,7 +488,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
for app in apps:
srv_coro = app.create_server(
return_asyncio_server=True,
asyncio_server_kwargs=dict(start_serving=False),
asyncio_server_kwargs={"start_serving": False},
)
loop.run_until_complete(srv_coro)
@ -526,7 +526,7 @@ def test_multiple_uvloop_configs_display_warning(caplog):
for app in (default_uvloop, no_uvloop, yes_uvloop):
srv_coro = app.create_server(
return_asyncio_server=True,
asyncio_server_kwargs=dict(start_serving=False),
asyncio_server_kwargs={"start_serving": False},
)
srv = loop.run_until_complete(srv_coro)
loop.run_until_complete(srv.startup())

View File

@ -483,7 +483,7 @@ def test_bp_exception_handler_applied(app: Sanic):
@handled.exception(Error)
def handle_error(req, e):
return text("handled {}".format(e))
return text(f"handled {e}")
@handled.route("/ok")
def ok(request):
@ -513,7 +513,7 @@ def test_bp_exception_handler_not_applied(app: Sanic):
@handled.exception(Error)
def handle_error(req, e):
return text("handled {}".format(e))
return text(f"handled {e}")
@nothandled.route("/notok")
def notok(request):

View File

@ -545,9 +545,9 @@ def test_guess_mime_logging(
with caplog.at_level(logging.DEBUG, logger="sanic.root"):
guess_mime(fake_request, fallback)
(logmsg,) = [
(logmsg,) = (
r.message for r in caplog.records if r.funcName == "guess_mime"
]
)
assert logmsg == expected

View File

@ -308,7 +308,7 @@ def test_contextual_exception_context(debug):
assert dl == {"foo": "bar"}
_, response = app.test_client.post("/coffee/text", debug=debug)
lines = list(map(lambda x: x.decode(), response.body.split(b"\n")))
lines = [x.decode() for x in response.body.split(b"\n")]
idx = lines.index("Context") + 1
assert response.status == 418
assert lines[2] == "Sorry, I cannot brew coffee"
@ -358,7 +358,7 @@ def test_contextual_exception_extra(debug):
assert not dl
_, response = app.test_client.post("/coffee/text", debug=debug)
lines = list(map(lambda x: x.decode(), response.body.split(b"\n")))
lines = [x.decode() for x in response.body.split(b"\n")]
assert response.status == 418
assert lines[2] == "Found bar"
if debug:

View File

@ -402,7 +402,7 @@ def test_accept_misc():
assert m.header.type == "*"
assert m.header.subtype == "plain"
assert m.header.q == 1.0
assert m.header.params == dict(param="123")
assert m.header.params == {"param": "123"}
# Matches object against another Matched object (by mime and header)
assert m == a.match("text/*")
# Against unsupported type falls back to object id matching

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio

View File

@ -129,10 +129,10 @@ def scanner(proc, trigger="complete"):
yield line
argv = dict(
script=[sys.executable, "reloader.py"],
module=[sys.executable, "-m", "reloader"],
sanic=[
argv = {
"script": [sys.executable, "reloader.py"],
"module": [sys.executable, "-m", "reloader"],
"sanic": [
sys.executable,
"-m",
"sanic",
@ -141,14 +141,14 @@ argv = dict(
"--auto-reload",
"reloader.app",
],
)
}
@pytest.mark.parametrize(
"runargs, mode",
[
(dict(port=42202, auto_reload=True), "script"),
(dict(port=42203, auto_reload=True), "module"),
({"port": 42202, "auto_reload": True}, "script"),
({"port": 42203, "auto_reload": True}, "module"),
({}, "sanic"),
],
)
@ -180,8 +180,8 @@ async def test_reloader_live(runargs, mode):
@pytest.mark.parametrize(
"runargs, mode",
[
(dict(port=42302, auto_reload=True), "script"),
(dict(port=42303, auto_reload=True), "module"),
({"port": 42302, "auto_reload": True}, "script"),
({"port": 42303, "auto_reload": True}, "module"),
({}, "sanic"),
],
)

View File

@ -573,7 +573,7 @@ def test_streaming_echo():
async def client(app, reader, writer):
# Unfortunately httpx does not support 2-way streaming, so do it by hand.
host = "host: localhost:8000\r\n".encode()
host = b"host: localhost:8000\r\n"
writer.write(
b"POST /echo HTTP/1.1\r\n" + host + b"content-length: 2\r\n"
b"content-type: text/plain; charset=utf-8\r\n"

View File

@ -2260,7 +2260,7 @@ def test_conflicting_body_methods_overload(app: Sanic):
assert response.json == {
"name": "test_conflicting_body_methods_overload.delete",
"foo": "test",
"body": str("".encode()),
"body": str(b""),
}

View File

@ -117,7 +117,7 @@ def test_custom_dumps_and_kwargs(json_app: Sanic):
return json_response(JSON_BODY, dumps=custom_dumps, prry="platypus")
_, resp = json_app.test_client.get("/json-custom")
assert resp.body == "custom".encode()
assert resp.body == b"custom"
custom_dumps.assert_called_once_with(JSON_BODY, prry="platypus")
@ -135,7 +135,7 @@ def test_override_dumps_and_kwargs(json_app: Sanic):
_, resp = json_app.test_client.get("/json-custom")
assert resp.body == "custom2".encode()
assert resp.body == b"custom2"
custom_dumps_1.assert_called_once_with(JSON_BODY, prry="platypus")
custom_dumps_2.assert_called_once_with(JSON_BODY, platypus="prry")

View File

@ -1124,8 +1124,8 @@ def test_route_invalid_host(app):
return text("pass")
assert str(excinfo.value) == (
"Expected either string or Iterable of " "host strings, not {!r}"
).format(host)
"Expected either string or Iterable of " f"host strings, not {host!r}"
)
def test_route_with_regex_group(app):

View File

@ -324,7 +324,7 @@ def test_static_content_range_error(app, file_name, static_file_directory):
assert response.status == 416
assert "Content-Length" in response.headers
assert "Content-Range" in response.headers
assert response.headers["Content-Range"] == "bytes */%s" % (
assert response.headers["Content-Range"] == "bytes */{}".format(
len(get_file_content(static_file_directory, file_name)),
)

View File

@ -14,24 +14,24 @@ from sanic.response import text
from sanic.views import HTTPMethodView
URL_FOR_ARGS1 = dict(arg1=["v1", "v2"])
URL_FOR_ARGS1 = {"arg1": ["v1", "v2"]}
URL_FOR_VALUE1 = "/myurl?arg1=v1&arg1=v2"
URL_FOR_ARGS2 = dict(arg1=["v1", "v2"], _anchor="anchor")
URL_FOR_ARGS2 = {"arg1": ["v1", "v2"], "_anchor": "anchor"}
URL_FOR_VALUE2 = "/myurl?arg1=v1&arg1=v2#anchor"
URL_FOR_ARGS3 = dict(
arg1="v1",
_anchor="anchor",
_scheme="http",
_server=f"{test_host}:{test_port}",
_external=True,
)
URL_FOR_ARGS3 = {
"arg1": "v1",
"_anchor": "anchor",
"_scheme": "http",
"_server": f"{test_host}:{test_port}",
"_external": True,
}
URL_FOR_VALUE3 = f"http://{test_host}:{test_port}/myurl?arg1=v1#anchor"
URL_FOR_ARGS4 = dict(
arg1="v1",
_anchor="anchor",
_external=True,
_server=f"http://{test_host}:{test_port}",
)
URL_FOR_ARGS4 = {
"arg1": "v1",
"_anchor": "anchor",
"_external": True,
"_server": f"http://{test_host}:{test_port}",
}
URL_FOR_VALUE4 = f"http://{test_host}:{test_port}/myurl?arg1=v1#anchor"

View File

@ -465,7 +465,7 @@ def test_static_content_range_error(app, file_name, static_file_directory):
assert response.status == 416
assert "Content-Length" in response.headers
assert "Content-Range" in response.headers
assert response.headers["Content-Range"] == "bytes */%s" % (
assert response.headers["Content-Range"] == "bytes */{}".format(
len(get_file_content(static_file_directory, file_name)),
)
@ -482,6 +482,6 @@ def test_static_content_range_error(app, file_name, static_file_directory):
assert response.status == 416
assert "Content-Length" in response.headers
assert "Content-Range" in response.headers
assert response.headers["Content-Range"] == "bytes */%s" % (
assert response.headers["Content-Range"] == "bytes */{}".format(
len(get_file_content(static_file_directory, file_name)),
)

View File

@ -20,9 +20,8 @@ def run_check(path_location: str) -> str:
process = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
capture_output=True,
text=True,
)
output = process.stdout + process.stderr
return output

View File

@ -23,12 +23,12 @@ def test_del_state():
def test_iter_state():
result = [item for item in gen_state(one=1, two=2)]
result = list(gen_state(one=1, two=2))
assert result == ["one", "two"]
def test_state_len():
result = [item for item in gen_state(one=1, two=2)]
result = list(gen_state(one=1, two=2))
assert len(result) == 2