Merge branch 'main' into niceback-error-handling

This commit is contained in:
L. Karkkainen 2023-02-05 14:23:11 +00:00
commit 53b7a5a5a1
42 changed files with 1023 additions and 644 deletions

View File

@ -23,5 +23,6 @@ module = [
"trustme.*",
"sanic_routing.*",
"aioquic.*",
"html5tagger.*",
]
ignore_missing_imports = true

View File

@ -57,12 +57,10 @@ from sanic.config import SANIC_PREFIX, Config
from sanic.exceptions import (
BadRequest,
SanicException,
SanicIsADirectoryError,
ServerError,
URLBuildError,
)
from sanic.handlers import ErrorHandler
from sanic.handlers.directory import DirectoryHandler
from sanic.helpers import Default, _default
from sanic.http import Stage
from sanic.log import (
@ -74,6 +72,7 @@ from sanic.log import (
from sanic.middleware import Middleware, MiddlewareLocation
from sanic.mixins.listeners import ListenerEvent
from sanic.mixins.startup import StartupMixin
from sanic.mixins.static import StaticHandleMixin
from sanic.models.futures import (
FutureException,
FutureListener,
@ -81,7 +80,6 @@ from sanic.models.futures import (
FutureRegistry,
FutureRoute,
FutureSignal,
FutureStatic,
)
from sanic.models.handler_types import ListenerType, MiddlewareType
from sanic.models.handler_types import Sanic as SanicVar
@ -108,7 +106,7 @@ if OS_IS_WINDOWS: # no cov
enable_windows_color_support()
class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"""
The main application instance
"""
@ -142,7 +140,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"config",
"configure_logging",
"ctx",
"directory_handler",
"error_handler",
"inspector_class",
"go_fast",
@ -172,7 +169,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
ctx: Optional[Any] = None,
router: Optional[Router] = None,
signal_router: Optional[SignalRouter] = None,
directory_handler: Optional[DirectoryHandler] = None,
error_handler: Optional[ErrorHandler] = None,
env_prefix: Optional[str] = SANIC_PREFIX,
request_class: Optional[Type[Request]] = None,
@ -217,9 +213,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
self.blueprints: Dict[str, Blueprint] = {}
self.configure_logging: bool = configure_logging
self.ctx: Any = ctx or SimpleNamespace()
self.directory_handler: DirectoryHandler = (
directory_handler or DirectoryHandler(self.debug)
)
self.error_handler: ErrorHandler = error_handler or ErrorHandler()
self.inspector_class: Type[Inspector] = inspector_class or Inspector
self.listeners: Dict[str, List[ListenerType[Any]]] = defaultdict(list)
@ -448,9 +441,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
return routes
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _apply_middleware(
self,
middleware: FutureMiddleware,
@ -897,11 +887,11 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
Union[
BaseHTTPResponse,
Coroutine[Any, Any, Optional[BaseHTTPResponse]],
ResponseStream,
]
] = None
run_middleware = True
try:
await self.dispatch(
"http.routing.before",
inline=True,
@ -933,7 +923,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
and request.stream.request_body
and not route.extra.ignore_body
):
if hasattr(handler, "is_stream"):
# Streaming handler: lift the size limit
request.stream.request_max_size = float("inf")
@ -1007,7 +996,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
...
await response.send(end_stream=True)
elif isinstance(response, ResponseStream):
resp = await response(request) # type: ignore
resp = await response(request)
await self.dispatch(
"http.lifecycle.response",
inline=True,
@ -1016,7 +1005,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"response": resp,
},
)
await response.eof() # type: ignore
await response.eof()
else:
if not hasattr(handler, "is_websocket"):
raise ServerError(
@ -1580,11 +1569,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
self.state.is_started = True
self.exception(SanicIsADirectoryError)(
DirectoryHandler.default_handler
)
self.directory_handler.debug = self.debug
def ack(self):
if hasattr(self, "multiplexer"):
self.multiplexer.ack()

View File

@ -9,6 +9,7 @@ from sanic.mixins.listeners import ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
from sanic.mixins.signals import SignalMixin
from sanic.mixins.static import StaticMixin
VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
@ -16,6 +17,7 @@ VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
class BaseSanic(
RouteMixin,
StaticMixin,
MiddlewareMixin,
ListenerMixin,
ExceptionMixin,

View File

@ -4,7 +4,6 @@ from sanic.compat import UpperStrEnum
class HTTPMethod(UpperStrEnum):
GET = auto()
POST = auto()
PUT = auto()
@ -15,7 +14,6 @@ class HTTPMethod(UpperStrEnum):
class LocalCertCreator(UpperStrEnum):
AUTO = auto()
TRUSTME = auto()
MKCERT = auto()
@ -34,4 +32,3 @@ CACHEABLE_HTTP_METHODS = (HTTPMethod.GET, HTTPMethod.HEAD)
DEFAULT_HTTP_CONTENT_TYPE = "application/octet-stream"
DEFAULT_LOCAL_TLS_KEY = "key.pem"
DEFAULT_LOCAL_TLS_CERT = "cert.pem"
DEFAULT_INDEX = "index.html"

View File

@ -22,8 +22,9 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES
from sanic.response import html, json, text
from sanic.pages.error import ErrorPage
from sanic.response import html, json, text
dumps: t.Callable[..., str]
try:
@ -391,16 +392,13 @@ CONTENT_TYPE_BY_RENDERERS = {
v: k for k, v in RENDERERS_BY_CONTENT_TYPE.items()
}
# Handler source code is checked for which response types it returns with the
# route error_format="auto" (default) to determine which format to use.
RESPONSE_MAPPING = {
"empty": "html",
"json": "json",
"text": "text",
"raw": "text",
"html": "html",
"file": "html",
"file_stream": "text",
"stream": "text",
"redirect": "html",
"JSONResponse": "json",
"text/plain": "text",
"text/html": "html",
"application/json": "json",

View File

@ -1,5 +1,4 @@
from asyncio import CancelledError
from pathlib import Path
from typing import Any, Dict, Optional, Union
from sanic.helpers import STATUS_CODES
@ -270,10 +269,3 @@ class InvalidSignal(SanicException):
class WebsocketClosed(SanicException):
quiet = True
message = "Client has closed the websocket connection"
class SanicIsADirectoryError(SanicException):
quiet = True
location: Path
autoindex: bool
index_name: str

View File

@ -4,37 +4,57 @@ from datetime import datetime
from operator import itemgetter
from pathlib import Path
from stat import S_ISDIR
from typing import Any, Coroutine, Dict, Iterable, Optional, Union, cast
from typing import Dict, Iterable, Optional, Sequence, Union, cast
from sanic.exceptions import SanicIsADirectoryError
from sanic.pages.autoindex import AutoIndex, FileInfo
from sanic.exceptions import NotFound
from sanic.pages.directory_page import DirectoryPage, FileInfo
from sanic.request import Request
from sanic.response import file, html, redirect
from sanic.response.types import HTTPResponse
class DirectoryHandler:
def __init__(self, debug: bool) -> None:
self.debug = debug
def __init__(
self,
uri: str,
directory: Path,
directory_view: bool = False,
index: Optional[Union[str, Sequence[str]]] = None,
) -> None:
if isinstance(index, str):
index = [index]
elif index is None:
index = []
self.base = uri.strip("/")
self.directory = directory
self.directory_view = directory_view
self.index = tuple(index)
def handle(
self, directory: Path, autoindex: bool, index_name: str, url: str
):
index_file = directory / index_name
if autoindex and (not index_file.exists() or not index_name):
return self.index(directory, url)
async def handle(self, request: Request, path: str):
current = path.strip("/")[len(self.base) :].strip("/") # noqa: E203
for file_name in self.index:
index_file = self.directory / current / file_name
if index_file.is_file():
return await file(index_file)
if index_name:
return file(index_file)
def index(self, directory: Path, url: str):
# Remove empty path elements, append slash
if "//" in url or not url.endswith("/"):
return redirect(
"/" + "".join([f"{p}/" for p in url.split("/") if p])
if self.directory_view:
return self._index(
self.directory / current, path, request.app.debug
)
if self.index:
raise NotFound("File not found")
raise IsADirectoryError(f"{self.directory.as_posix()} is a directory")
def _index(self, location: Path, path: str, debug: bool):
# Remove empty path elements, append slash
if "//" in path or not path.endswith("/"):
return redirect(
"/" + "".join([f"{p}/" for p in path.split("/") if p])
)
# Render file browser
page = AutoIndex(self._iter_files(directory), url, self.debug)
page = DirectoryPage(self._iter_files(location), path, debug)
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
@ -57,23 +77,8 @@ class DirectoryHandler:
"file_size": stat.st_size,
}
def _iter_files(self, directory: Path) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in directory.iterdir()]
def _iter_files(self, location: Path) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in location.iterdir()]
for item in sorted(prepared, key=itemgetter("priority", "file_name")):
del item["priority"]
yield cast(FileInfo, item)
@classmethod
def default_handler(
cls, request: Request, exception: SanicIsADirectoryError
) -> Optional[Coroutine[Any, Any, HTTPResponse]]:
if exception.autoindex or exception.index_name:
maybe_response = request.app.directory_handler.handle(
exception.location,
exception.autoindex,
exception.index_name,
request.path,
)
if maybe_response:
return maybe_response
return None

View File

@ -126,7 +126,6 @@ class CertCreator(ABC):
local_tls_key,
local_tls_cert,
) -> CertCreator:
creator: Optional[CertCreator] = None
cert_creator_options: Tuple[

35
sanic/mixins/base.py Normal file
View File

@ -0,0 +1,35 @@
from typing import Optional
from sanic.base.meta import SanicMeta
class BaseMixin(metaclass=SanicMeta):
name: str
strict_slashes: Optional[bool]
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name

View File

@ -1,11 +1,6 @@
from ast import NodeVisitor, Return, parse
from contextlib import suppress
from email.utils import formatdate
from functools import partial, wraps
from inspect import getsource, signature
from mimetypes import guess_type
from os import path
from pathlib import Path, PurePath
from textwrap import dedent
from typing import (
Any,
@ -19,30 +14,15 @@ from typing import (
Union,
cast,
)
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import (
DEFAULT_HTTP_CONTENT_TYPE,
DEFAULT_INDEX,
HTTP_METHODS,
)
from sanic.constants import HTTP_METHODS
from sanic.errorpages import RESPONSE_MAPPING
from sanic.exceptions import (
FileNotFound,
HeaderNotFound,
RangeNotSatisfiable,
SanicIsADirectoryError,
)
from sanic.handlers import ContentRangeHandler
from sanic.helpers import Default, _default
from sanic.log import error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureRoute, FutureStatic
from sanic.models.handler_types import RouteHandler
from sanic.response import HTTPResponse, file, file_stream, validate_file
from sanic.types import HashableDict
@ -51,20 +31,14 @@ RouteWrapper = Callable[
]
class RouteMixin(metaclass=SanicMeta):
name: str
class RouteMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_routes: Set[FutureRoute] = set()
self._future_statics: Set[FutureStatic] = set()
self.strict_slashes: Optional[bool] = False
def _apply_route(self, route: FutureRoute) -> List[Route]:
raise NotImplementedError # noqa
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def route(
self,
uri: str,
@ -698,341 +672,6 @@ class RouteMixin(metaclass=SanicMeta):
**ctx_kwargs,
)(handler)
def static(
self,
uri: str,
file_or_directory: Union[str, bytes, PurePath],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: bool = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[bool] = None,
apply: bool = True,
resource_type: Optional[str] = None,
autoindex: bool = False,
index_name: Union[str, Default] = _default,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
if isinstance(index_name, Default):
index_name = DEFAULT_INDEX
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
autoindex,
index_name,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path
async def _static_request_handler(
self,
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
request,
content_type=None,
__file_uri__=None,
autoindex=False,
index_name="",
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if type(stream_large_files) == int:
threshold = stream_large_files
else:
threshold = 1024 * 1024
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(
file_path,
headers=headers,
_range=_range,
autoindex=autoindex,
index_name=index_name,
)
except (RangeNotSatisfiable, SanicIsADirectoryError):
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
f"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
:param app: Sanic
:param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the
server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the file_stream() handler
rather than the file() handler to send the file
If this is an integer, this represents the
threshold size to switch to file_stream()
:param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header
:return: registered static routes
:rtype: List[sanic.router.Route]
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory,
static.use_modified_since,
static.use_content_range,
static.stream_large_files,
content_type=static.content_type,
autoindex=static.autoindex,
index_name=static.index_name,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
def _determine_error_format(self, handler) -> str:
with suppress(OSError, TypeError):
src = dedent(getsource(handler))

View File

@ -1109,7 +1109,6 @@ class StartupMixin(metaclass=SanicMeta):
app: StartupMixin,
server_info: ApplicationServerInfo,
) -> None: # no cov
try:
# We should never get to this point without a server
# This is primarily to keep mypy happy

348
sanic/mixins/static.py Normal file
View File

@ -0,0 +1,348 @@
from email.utils import formatdate
from functools import partial, wraps
from mimetypes import guess_type
from os import PathLike, path
from pathlib import Path, PurePath
from typing import Optional, Sequence, Set, Union, cast
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.handlers.directory import DirectoryHandler
from sanic.log import deprecation, error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureStatic
from sanic.request import Request
from sanic.response import HTTPResponse, file, file_stream, validate_file
class StaticMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_statics: Set[FutureStatic] = set()
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def static(
self,
uri: str,
file_or_directory: Union[PathLike, str, bytes],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: Union[bool, int] = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[str] = None,
apply: bool = True,
resource_type: Optional[str] = None,
index: Optional[Union[str, Sequence[str]]] = None,
directory_view: bool = False,
directory_handler: Optional[DirectoryHandler] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:param apply: If true, will register the route immediately
:param resource_type: Explicitly declare a resource to be a "
file" or a "dir"
:param index: When exposing against a directory, index is the name that
will be served as the default file. When multiple files names are
passed, then they will be tried in order.
:param directory_view: Whether to fallback to showing the directory
viewer when exposing a directory
:param directory_handler: An instance of :class:`DirectoryHandler`
that can be used for explicitly controlling and subclassing the
behavior of the default directory handler
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
if isinstance(file_or_directory, bytes):
deprecation(
"Serving a static directory with a bytes string is "
"deprecated and will be removed in v22.9.",
22.9,
)
file_or_directory = cast(str, file_or_directory.decode())
file_or_directory = Path(file_or_directory)
if directory_handler and (directory_view or index):
raise ValueError(
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
if not directory_handler:
directory_handler = DirectoryHandler(
uri=uri,
directory=file_or_directory,
directory_view=directory_view,
index=index,
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
directory_handler,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
class StaticHandleMixin(metaclass=SanicMeta):
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory=file_or_directory,
use_modified_since=static.use_modified_since,
use_content_range=static.use_content_range,
stream_large_files=static.stream_large_files,
content_type=static.content_type,
directory_handler=static.directory_handler,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
async def _static_request_handler(
self,
request: Request,
*,
file_or_directory: PathLike,
use_modified_since: bool,
use_content_range: bool,
stream_large_files: Union[bool, int],
directory_handler: DirectoryHandler,
content_type: Optional[str] = None,
__file_uri__: Optional[str] = None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if isinstance(stream_large_files, bool):
threshold = 1024 * 1024
else:
threshold = stream_large_files
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except (IsADirectoryError, PermissionError):
return await directory_handler.handle(request, request.path)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path

View File

@ -1,6 +1,7 @@
from pathlib import PurePath
from pathlib import Path
from typing import Dict, Iterable, List, NamedTuple, Optional, Union
from sanic.handlers.directory import DirectoryHandler
from sanic.models.handler_types import (
ErrorMiddlewareType,
ListenerType,
@ -46,18 +47,17 @@ class FutureException(NamedTuple):
class FutureStatic(NamedTuple):
uri: str
file_or_directory: Union[str, bytes, PurePath]
file_or_directory: Path
pattern: str
use_modified_since: bool
use_content_range: bool
stream_large_files: bool
stream_large_files: Union[bool, int]
name: str
host: Optional[str]
strict_slashes: Optional[bool]
content_type: Optional[bool]
content_type: Optional[str]
resource_type: Optional[str]
autoindex: bool
index_name: str
directory_handler: DirectoryHandler
class FutureSignal(NamedTuple):

View File

@ -1,88 +0,0 @@
from textwrap import dedent
from typing import Iterable, TypedDict
from html5tagger import E
from .base import BasePage
class FileInfo(TypedDict):
icon: str
file_name: str
file_access: str
file_size: str
class AutoIndex(BasePage): # no cov
EXTRA_STYLE = dedent(
f"""
#breadcrumbs a:hover {{ text-decoration: underline; }}
#breadcrumbs .path-0 a {{ text-decoration: none; }}
#breadcrumbs span::after {{
content: "/"; text-decoration: none; padding: 0 0.25em;
}}
#breadcrumbs .path-0 a::before {{ content: "🏠"; }}
#breadcrumbs > span > a {{ color: {BasePage.ACCENT}; }}
main a {{ color: inherit; font-weight: bold; }}
table.autoindex {{ width: 100%; font-family: monospace; }}
table.autoindex tr {{ display: flex; }}
table.autoindex tr:hover {{ background-color: #ddd; }}
table.autoindex td {{ margin: 0 0.5rem; }}
table.autoindex td:first-child {{ flex: 1; }}
table.autoindex td:nth-child(2) {{ text-align: right; }}
table.autoindex td:last-child {{ text-align: right; }}
@media (min-width: 915px) {{
table.autoindex {{ font-size: 1.75vw; }}
}}
@media (min-width: 1600px) {{
table.autoindex {{ font-size: 1.75rem; }}
}}
@media (prefers-color-scheme: dark) {{
table.autoindex tr:hover {{ background-color: #222; }}
}}
"""
)
TITLE = "File Browser"
def __init__(
self, files: Iterable[FileInfo], url: str, debug: bool
) -> None:
super().__init__(debug)
self.files = files
self.url = url
def _body(self) -> None:
with self.doc.main:
self._headline()
files = list(self.files)
if files:
self._file_table(files)
else:
self.doc.p("The folder is empty.")
def _headline(self):
"""Implement a heading with the current path, combined with
breadcrumb links"""
with self.doc.h1(id="breadcrumbs"):
p = self.url.split("/")[:-1]
for i in reversed(range(len(p))):
self.doc.span(class_=f"path-{i}").__enter__()
for i, part in enumerate(p):
path = "/".join(p[: i + 1]) + "/"
self.doc.a(part, href=path)
self.doc.__exit__(None, None, None)
def _file_table(self, files: Iterable[FileInfo]):
with self.doc.table(class_="autoindex container"):
for f in files:
self._file_row(**f)
def _file_row(
self,
icon: str,
file_name: str,
file_access: str,
file_size: str,
):
first = E.span(icon, class_="icon").a(file_name, href=file_name)
self.doc.tr.td(first).td(file_size).td(file_access)

View File

@ -1,49 +1,15 @@
from abc import ABC, abstractmethod
from textwrap import dedent
from html5tagger import HTML, Document
from sanic import __version__ as VERSION
from sanic.application.logo import SVG_LOGO
from sanic.pages.css import CSS
class BasePage(ABC): # no cov
ACCENT = "#ff0d68"
BASE_STYLE = dedent(
"""
html { font: 16px sans-serif; background: #eee; color: #111; }
body { margin: 0; font-size: 1.25rem; }
body > * { padding: 1rem 2vw; }
@media (max-width: 1200px) {
body > * { padding: 0.5rem 1.5vw;}
body { font-size: 1rem; }
}
.container { min-width: 600px; max-width: 1600px; }
header {
background: #111; color: #e1e1e1; border-bottom: 1px solid #272727;
}
header .container {
display: flex; align-items: center; justify-content: space-between;
}
main { padding-bottom: 3rem; }
h1 { text-align: left; }
h2 { margin: 2rem 0 1rem 0; }
a:visited { color: inherit; }
a { text-decoration: none; color: #88f; }
a:hover, a:focus { text-decoration: underline; outline: none; }
#logo { height: 2.5rem; }
table { width: 100%; max-width: 1200px; word-break: break-all; }
#logo { height: 2.75rem; padding: 0.25rem 0; }
.smalltext { font-size: 1rem; }
.nobr { white-space: nowrap; }
span.icon { margin-right: 1rem; }
@media (prefers-color-scheme: dark) {
html { background: #111; color: #ccc; }
}
"""
)
EXTRA_STYLE = ""
class BasePage(ABC, metaclass=CSS): # no cov
TITLE = "Unknown"
CSS: str
def __init__(self, debug: bool = True) -> None:
self.doc = Document(self.TITLE, lang="en")
@ -51,23 +17,35 @@ class BasePage(ABC): # no cov
@property
def style(self) -> str:
return self.BASE_STYLE + self.EXTRA_STYLE
return self.CSS
def render(self) -> str:
self._head()
self._body()
self._foot()
return str(self.doc)
def _head(self) -> None:
self.doc.style(HTML(self.style))
with self.doc.header:
with self.doc.div(class_="container"):
if self.debug:
self.doc(HTML(SVG_LOGO))
self.doc.div(self.TITLE, id="hdrtext")
if self.debug:
self.doc.div(f"Version {VERSION}", id="hdrver")
self.doc.div(self.TITLE)
def _foot(self) -> None:
with self.doc.footer:
self.doc.div("powered by")
with self.doc.div:
self._sanic_logo()
if self.debug:
self.doc.div(f"Version {VERSION}")
@abstractmethod
def _body(self) -> None:
...
def _sanic_logo(self) -> None:
self.doc.a(
HTML(SVG_LOGO),
href="https://sanic.dev",
target="_blank",
referrerpolicy="no-referrer",
)

35
sanic/pages/css.py Normal file
View File

@ -0,0 +1,35 @@
from abc import ABCMeta
from pathlib import Path
from typing import Optional
CURRENT_DIR = Path(__file__).parent
def _extract_style(maybe_style: Optional[str], name: str) -> str:
if maybe_style is not None:
maybe_path = Path(maybe_style)
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return maybe_style
maybe_path = CURRENT_DIR / "styles" / f"{name}.css"
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return ""
class CSS(ABCMeta):
"""Cascade stylesheets, i.e. combine all ancestor styles"""
def __new__(cls, name, bases, attrs):
Page = super().__new__(cls, name, bases, attrs)
# Use a locally defined STYLE or the one from styles directory
s = _extract_style(attrs.get("STYLE"), name)
Page.STYLE = f"\n/* {name} */\n{s.strip()}\n" if s else ""
# Combine with all ancestor styles
Page.CSS = "".join(
Class.STYLE
for Class in reversed(Page.__mro__)
if type(Class) is CSS
)
return Page

View File

@ -0,0 +1,66 @@
import sys
from typing import Dict, Iterable
from html5tagger import E
from .base import BasePage
if sys.version_info < (3, 8): # no cov
FileInfo = Dict
else:
from typing import TypedDict
class FileInfo(TypedDict):
icon: str
file_name: str
file_access: str
file_size: str
class DirectoryPage(BasePage): # no cov
TITLE = "Directory Viewer"
def __init__(
self, files: Iterable[FileInfo], url: str, debug: bool
) -> None:
super().__init__(debug)
self.files = files
self.url = url
def _body(self) -> None:
with self.doc.main:
self._headline()
files = list(self.files)
if files:
self._file_table(files)
else:
self.doc.p("The folder is empty.")
def _headline(self):
"""Implement a heading with the current path, combined with
breadcrumb links"""
with self.doc.h1(id="breadcrumbs"):
p = self.url.split("/")[:-1]
for i, part in enumerate(p):
path = "/".join(p[: i + 1]) + "/"
with self.doc.a(href=path):
self.doc.span(part, class_="dir").span("/", class_="sep")
def _file_table(self, files: Iterable[FileInfo]):
with self.doc.table(class_="autoindex container"):
for f in files:
self._file_row(**f)
def _file_row(
self,
icon: str,
file_name: str,
file_access: str,
file_size: str,
):
first = E.span(icon, class_="icon").a(file_name, href=file_name)
self.doc.tr.td(first).td(file_size).td(file_access)

View File

@ -0,0 +1,83 @@
html {
font: 16px sans-serif;
background: #eee;
color: #111;
}
body {
margin: 0;
font-size: 1.25rem;
}
body>* {
padding: 1rem 2vw;
}
@media (max-width: 1200px) {
body>* {
padding: 0.5rem 1.5vw;
}
body {
font-size: 1rem;
}
}
.smalltext {
font-size: 1.0rem;
}
.container {
min-width: 600px;
max-width: 1600px;
}
header {
background: #111;
color: #e1e1e1;
border-bottom: 1px solid #272727;
text-align: center;
}
footer {
text-align: center;
display: flex;
flex-direction: column;
font-size: 0.8rem;
margin-top: 2rem;
}
h1 {
text-align: left;
}
a:visited {
color: inherit;
}
a {
text-decoration: none;
color: #88f;
}
a:hover,
a:focus {
text-decoration: underline;
outline: none;
}
#logo {
height: 1.75rem;
padding: 0 0.25rem;
}
span.icon {
margin-right: 1rem;
}
@media (prefers-color-scheme: dark) {
html {
background: #111;
color: #ccc;
}
}

View File

@ -0,0 +1,62 @@
#breadcrumbs>a:hover {
text-decoration: none;
}
#breadcrumbs>a .dir {
padding: 0 0.25em;
}
#breadcrumbs>a:first-child:hover::before,
#breadcrumbs>a .dir:hover {
text-decoration: underline;
}
#breadcrumbs>a:first-child::before {
content: "🏠";
}
#breadcrumbs>a:last-child {
color: #ff0d68;
}
main a {
color: inherit;
font-weight: bold;
}
table.autoindex {
width: 100%;
font-family: monospace;
font-size: 1.25rem;
}
table.autoindex tr {
display: flex;
}
table.autoindex tr:hover {
background-color: #ddd;
}
table.autoindex td {
margin: 0 0.5rem;
}
table.autoindex td:first-child {
flex: 1;
}
table.autoindex td:nth-child(2) {
text-align: right;
}
table.autoindex td:last-child {
text-align: right;
}
@media (prefers-color-scheme: dark) {
table.autoindex tr:hover {
background-color: #222;
}
}

View File

@ -0,0 +1,140 @@
/* ErrorPage */
summary { color: #888; }
/* TraceRite CSS: read from tracerite.html.style: */
:root {
--tracerite-var: #8af;
--tracerite-type: #5c8;
--tracerite-val: #8af;
--tracerite-tab: #aaaa;
}
/* CSS reset */
.tracerite,
.tracerite * {
margin: 0;
padding: 0;
outline: none;
box-sizing: border-box;
overflow: hidden;
}
.tracerite h3 {
margin: 0;
padding: .2em 0;
font-size: 1.2em;
}
.tracerite p {
margin: 0;
}
.tracerite pre {
width: 100%;
padding: .5em;
}
.tracerite .codeline {
text-indent: 4ch each-line;
}
.tracerite .codeline::before {
content: attr(data-lineno);
color: #888;
opacity: 0.0;
transition: all 0.4s;
display: inline-block;
text-align: right;
text-indent: 0;
white-space: nowrap;
word-break: keep-all;
padding-right: 1ch;
width: 4ch;
}
.tracerite pre:hover .codeline::before {
opacity: 1.0;
}
.tracerite mark {
background: none;
}
.tracerite mark span {
background: #ff0;
}
.tracerite mark::after {
display: inline-block;
content: attr(data-symbol);
margin: -1ch;
transform: translate(2em, 0) scale(1.8);
color: #ff0;
-webkit-text-stroke: .05em black;
}
.tracerite .excmessage {
max-height: 12em;
overflow: auto;
}
.tracerite .exctype {color: gray}
.tracerite .traceback-labels {
display: flex;
align-items: center;
border-bottom: 3px solid var(--tracerite-tab);
margin-top: 0.3em;
}
.tracerite .traceback-labels button {
background: var(--tracerite-tab); color: black;
border: 0;
border-radius: .5em .5em 0 0;
flex-shrink: 1;
line-height: 1.0;
padding: .5em;
margin-right: .2em;
}
.tracerite .traceback-labels button:hover { background: #ddd }
.tracerite .traceback-labels * {
white-space: nowrap;
text-overflow: ellipsis;
overflow: hidden;
}
.tracerite .traceback-tabs .content {
scroll-snap-type: x mandatory;
display: flex;
align-items: flex-start;
overflow-x: auto;
scrollbar-width: none;
}
.tracerite .traceback-tabs .content::-webkit-scrollbar {
width: 0;
height: 0;
}
.tracerite .traceback-details {
min-width: 20ch;
max-width: 100%;
margin: 0 .3em;
flex-shrink: 0;
scroll-snap-align: start;
border-radius: .5em;
padding: .2em;
}
.tracerite .traceback-details:last-child {
width: 100%;
}
.tracerite table.inspector { margin-left: 3.5em; width: auto; word-break: break-word;) }
.tracerite .inspector tbody tr,
.tracerite .inspector th,
.tracerite .inspector td {
padding: 0;
background: none;
text-align: left;
max-width: 20em;
text-overflow: ellipsis;
overflow: hidden;
}
.tracerite .inspector .var { font-weight: bold; color: var(--tracerite-var) }
.tracerite .inspector .type { white-space: nowrap; color: var(--tracerite-type) }
.tracerite .inspector .val { white-space: pre-wrap; color: var(--tracerite-val) }
/* matrix value on a variable */
.tracerite .inspector table td {
color: var(--tracerite-val);
min-width: 3em;
word-break: keep-all;
overflow: hidden;
padding: 0;
font-size: 0.8em;
border-collapse: collapse;
text-align: right;
}

View File

@ -146,7 +146,6 @@ class Request:
head: bytes = b"",
stream_id: int = 0,
):
self.raw_url = url_bytes
try:
self._parsed_url = parse_url(url_bytes)

View File

@ -4,14 +4,13 @@ from datetime import datetime, timezone
from email.utils import formatdate, parsedate_to_datetime
from mimetypes import guess_type
from os import path
from pathlib import Path, PurePath
from pathlib import PurePath
from time import time
from typing import Any, AnyStr, Callable, Dict, Optional, Union
from urllib.parse import quote_plus
from sanic.compat import Header, open_async, stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, DEFAULT_INDEX
from sanic.exceptions import SanicIsADirectoryError
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.helpers import Default, _default
from sanic.log import logger
from sanic.models.protocol_types import HTMLProtocol, Range
@ -165,8 +164,6 @@ async def file(
max_age: Optional[Union[float, int]] = None,
no_store: Optional[bool] = None,
_range: Optional[Range] = None,
autoindex: bool = False,
index_name: Union[str, Default] = _default,
) -> HTTPResponse:
"""Return a response object with file data.
:param status: HTTP response code. Won't enforce the passed in
@ -229,26 +226,16 @@ async def file(
filename = filename or path.split(location)[-1]
try:
async with await open_async(location, mode="rb") as f:
if _range:
await f.seek(_range.start)
out_stream = await f.read(_range.size)
headers[
"Content-Range"
] = f"bytes {_range.start}-{_range.end}/{_range.total}"
status = 206
else:
out_stream = await f.read()
except IsADirectoryError as e:
if isinstance(index_name, Default):
index_name = DEFAULT_INDEX
exc = SanicIsADirectoryError(str(e))
exc.location = Path(location)
exc.autoindex = autoindex
exc.index_name = index_name
raise exc
async with await open_async(location, mode="rb") as f:
if _range:
await f.seek(_range.start)
out_stream = await f.read(_range.size)
headers[
"Content-Range"
] = f"bytes {_range.start}-{_range.end}/{_range.total}"
status = 206
else:
out_stream = await f.read()
mime_type = mime_type or guess_type(filename)[0] or "text/plain"
return HTTPResponse(

View File

@ -94,7 +94,6 @@ def watchdog(sleep_interval, reload_dirs):
try:
while True:
changed = set()
for filename in itertools.chain(
_iter_module_files(),

View File

@ -52,7 +52,6 @@ class WebsocketFrameAssembler:
paused: bool
def __init__(self, protocol) -> None:
self.protocol = protocol
self.read_mutex = asyncio.Lock()

View File

@ -686,7 +686,6 @@ class WebsocketImplProtocol:
:raises TypeError: for unsupported inputs
"""
async with self.conn_mutex:
if self.ws_proto.state in (CLOSED, CLOSING):
raise WebsocketClosed(
"Cannot write to websocket interface after it is closed."

View File

@ -12,7 +12,7 @@ def create_simple_server(directory: Path):
app = Sanic("SimpleServer")
app.static(
"/", directory, name="main", autoindex=True, index_name="index.html"
"/", directory, name="main", directory_view=True, index="index.html"
)
return app

View File

@ -11,7 +11,6 @@ class TouchUpMeta(SanicMeta):
methods = attrs.get("__touchup__")
attrs["__touched__"] = False
if methods:
for method in methods:
if method not in attrs:
raise SanicException(

View File

@ -75,7 +75,6 @@ def load_module_from_file_location(
location = location.decode(encoding)
if isinstance(location, Path) or "/" in location or "$" in location:
if not isinstance(location, Path):
# A) Check if location contains any environment variables
# in format ${some_env_var}.

View File

@ -35,6 +35,7 @@ def open_local(paths, mode="r", encoding="utf8"):
return codecs.open(path, mode, encoding)
def str_to_bool(val: str) -> bool:
val = val.lower()
if val in {
@ -55,6 +56,7 @@ def str_to_bool(val: str) -> bool:
else:
raise ValueError(f"Invalid truth value {val}")
with open_local(["sanic", "__version__.py"], encoding="latin1") as fp:
try:
version = re.findall(
@ -79,7 +81,7 @@ setup_kwargs = {
),
"long_description": long_description,
"packages": find_packages(exclude=("tests", "tests.*")),
"package_data": {"sanic": ["py.typed"]},
"package_data": {"sanic": ["py.typed", "pages/styles/*"]},
"platforms": "any",
"python_requires": ">=3.7",
"classifiers": [
@ -109,6 +111,7 @@ requirements = [
"aiofiles>=0.6.0",
"websockets>=10.0",
"multidict>=5.0,<7.0",
"html5tagger>=1.2.1",
]
tests_require = [

View File

@ -1,5 +1,7 @@
import asyncio
import inspect
import logging
import os
import random
import re
import string
@ -58,7 +60,6 @@ CACHE: Dict[str, Any] = {}
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = HTTP_METHODS
ROUTE_PARAM_TYPES = ["str", "int", "float", "alpha", "uuid"]
@ -232,3 +233,12 @@ def urlopen():
urlopen.read = Mock()
with patch("sanic.cli.inspector_client.urlopen", urlopen):
yield urlopen
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory

View File

@ -36,6 +36,7 @@ def test_app_loop_running(app: Sanic):
assert response.text == "pass"
@pytest.mark.asyncio
def test_create_asyncio_server(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
@ -44,6 +45,7 @@ def test_create_asyncio_server(app: Sanic):
assert srv.is_serving() is True
@pytest.mark.asyncio
def test_asyncio_server_no_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@ -55,6 +57,7 @@ def test_asyncio_server_no_start_serving(app: Sanic):
assert srv.is_serving() is False
@pytest.mark.asyncio
def test_asyncio_server_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@ -72,6 +75,7 @@ def test_asyncio_server_start_serving(app: Sanic):
# Looks like we can't easily test `serve_forever()`
@pytest.mark.asyncio
def test_create_server_main(app: Sanic, caplog):
app.listener("main_process_start")(lambda *_: ...)
loop = asyncio.get_event_loop()
@ -86,6 +90,7 @@ def test_create_server_main(app: Sanic, caplog):
) in caplog.record_tuples
@pytest.mark.asyncio
def test_create_server_no_startup(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@ -101,6 +106,7 @@ def test_create_server_no_startup(app: Sanic):
loop.run_until_complete(srv.start_serving())
@pytest.mark.asyncio
def test_create_server_main_convenience(app: Sanic, caplog):
app.main_process_start(lambda *_: ...)
loop = asyncio.get_event_loop()
@ -126,7 +132,6 @@ def test_app_loop_not_running(app: Sanic):
def test_app_run_raise_type_error(app: Sanic):
with pytest.raises(TypeError) as excinfo:
app.run(loop="loop")
@ -139,7 +144,6 @@ def test_app_run_raise_type_error(app: Sanic):
def test_app_route_raise_value_error(app: Sanic):
with pytest.raises(ValueError) as excinfo:
@app.route("/test")
@ -221,7 +225,6 @@ def test_app_websocket_parameters(websocket_protocol_mock, app: Sanic):
def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@ -241,7 +244,6 @@ def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
def test_handle_request_with_nested_exception_debug(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@ -470,6 +472,7 @@ def test_uvloop_config(app: Sanic, monkeypatch, use):
try_use_uvloop.assert_not_called()
@pytest.mark.asyncio
def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
apps = (Sanic("default-uvloop"), Sanic("no-uvloop"), Sanic("yes-uvloop"))
@ -506,6 +509,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
assert counter[(logging.WARNING, message)] == modified
@pytest.mark.asyncio
def test_multiple_uvloop_configs_display_warning(caplog):
Sanic._uvloop_setting = None # Reset the setting (changed in prev tests)

View File

@ -148,7 +148,6 @@ def test_cookie_set_unknown_property():
def test_cookie_set_same_key(app):
cookies = {"test": "wait"}
@app.get("/")

View File

@ -62,7 +62,6 @@ def exception_handler_app():
@exception_handler_app.route("/8", error_format="html")
def handler_8(request):
raise ErrorWithRequestCtx("OK")
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
@ -214,7 +213,7 @@ def test_error_handler_noisy_log(
exception_handler_app: Sanic, monkeypatch: MonkeyPatch
):
err_logger = Mock()
monkeypatch.setattr(handlers, "error_logger", err_logger)
monkeypatch.setattr(handlers.error, "error_logger", err_logger)
exception_handler_app.config["NOISY_EXCEPTIONS"] = False
exception_handler_app.test_client.get("/1")

View File

@ -514,7 +514,6 @@ def test_file_stream_head_response(
def test_file_stream_response_range(
app: Sanic, file_name, static_file_directory, size, start, end
):
Range = namedtuple("Range", ["size", "start", "end", "total"])
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)

View File

@ -722,7 +722,6 @@ def test_add_webscoket_route_with_version(app):
def test_route_duplicate(app):
with pytest.raises(RouteExists):
@app.route("/test")
@ -819,7 +818,6 @@ def test_unquote_add_route(app, unquote):
def test_dynamic_add_route(app):
results = []
async def handler(request, name):
@ -834,7 +832,6 @@ def test_dynamic_add_route(app):
def test_dynamic_add_route_string(app):
results = []
async def handler(request, name):
@ -938,7 +935,6 @@ def test_dynamic_add_route_unhashable(app):
def test_add_route_duplicate(app):
with pytest.raises(RouteExists):
async def handler1(request):
@ -1120,7 +1116,6 @@ def test_route_raise_ParameterNameConflicts(app):
def test_route_invalid_host(app):
host = 321
with pytest.raises(ValueError) as excinfo:

View File

@ -93,6 +93,7 @@ def test_dont_register_system_signals(app):
@pytest.mark.skipif(os.name == "nt", reason="windows cannot SIGINT processes")
def test_windows_workaround():
"""Test Windows workaround (on any other OS)"""
# At least some code coverage, even though this test doesn't work on
# Windows...
class MockApp:

View File

@ -1,4 +1,3 @@
import inspect
import logging
import os
import sys
@ -13,15 +12,6 @@ from sanic import Sanic, text
from sanic.exceptions import FileNotFound
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory
@pytest.fixture(scope="module")
def double_dotted_directory_file(static_file_directory: str):
"""Generate double dotted directory and its files"""
@ -118,7 +108,12 @@ def test_static_file_pathlib(app, static_file_directory, file_name):
def test_static_file_bytes(app, static_file_directory, file_name):
bsep = os.path.sep.encode("utf-8")
file_path = static_file_directory.encode("utf-8") + bsep + file_name
app.static("/testing.file", file_path)
message = (
"Serving a static directory with a bytes "
"string is deprecated and will be removed in v22.9."
)
with pytest.warns(DeprecationWarning, match=message):
app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
@ -431,7 +426,6 @@ def test_static_stream_large_file(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_use_modified_since(app, static_file_directory, file_name):
file_stat = os.stat(get_file_path(static_file_directory, file_name))
modified_since = strftime(
"%a, %d %b %Y %H:%M:%S GMT", gmtime(file_stat.st_mtime)

View File

@ -0,0 +1,123 @@
import os
from pathlib import Path
import pytest
from sanic import Sanic
from sanic.handlers.directory import DirectoryHandler
def get_file_path(static_file_directory, file_name):
return os.path.join(static_file_directory, file_name)
def get_file_content(static_file_directory, file_name):
"""The content of the static file to check"""
with open(get_file_path(static_file_directory, file_name), "rb") as file:
return file.read()
def test_static_directory_view(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, directory_view=True)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
def test_static_index_single(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="test.html")
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_index_single_not_found(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="index.html")
_, response = app.test_client.get("/static/")
assert response.status == 404
def test_static_index_multiple(app: Sanic, static_file_directory: str):
app.static(
"/static",
static_file_directory,
index=["index.html", "test.html"],
)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_directory_view_and_index(
app: Sanic, static_file_directory: str
):
app.static(
"/static",
static_file_directory,
directory_view=True,
index="foo.txt",
)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler(app: Sanic, static_file_directory: str):
dh = DirectoryHandler(
"/static",
Path(static_file_directory),
directory_view=True,
index="foo.txt",
)
app.static("/static", static_file_directory, directory_handler=dh)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler_fails(app: Sanic):
dh = DirectoryHandler(
"/static",
Path(""),
directory_view=True,
index="foo.txt",
)
message = (
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, directory_view=True)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, index="index.html")

View File

@ -654,7 +654,6 @@ def test_sanic_ssl_context_create():
reason="This test requires fork context",
)
def test_ssl_in_multiprocess_mode(app: Sanic, caplog):
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
event = Event()

View File

@ -176,7 +176,6 @@ def handler(request: Request):
async def client(app: Sanic, loop: AbstractEventLoop):
try:
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
async with httpx.AsyncClient(transport=transport) as client:
r = await client.get("http://myhost.invalid/")

View File

@ -83,7 +83,6 @@ def test_simple_url_for_getting_with_more_params(app, args, url):
def test_url_for_with_server_name(app):
server_name = f"{test_host}:{test_port}"
app.config.update({"SERVER_NAME": server_name})
path = "/myurl"

View File

@ -38,7 +38,6 @@ def test_load_module_from_file_location_with_non_existing_env_variable():
LoadFileException,
match="The following environment variables are not set: MuuMilk",
):
load_module_from_file_location("${MuuMilk}")