Establish basic file browser and index fallback (#2662)

Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
This commit is contained in:
Adam Hopkins 2023-02-05 15:09:04 +02:00 committed by GitHub
parent 30c53b6857
commit 9cb9e88678
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1031 additions and 477 deletions

View File

@ -9,6 +9,7 @@ omit =
sanic/simple.py
sanic/utils.py
sanic/cli
sanic/pages
[html]
directory = coverage

View File

@ -17,7 +17,8 @@ ignore:
- "sanic/compat.py"
- "sanic/simple.py"
- "sanic/utils.py"
- "sanic/cli"
- "sanic/cli/"
- "sanic/pages/"
- ".github/"
- "changelogs/"
- "docker/"

View File

@ -23,5 +23,6 @@ module = [
"trustme.*",
"sanic_routing.*",
"aioquic.*",
"html5tagger.*",
]
ignore_missing_imports = true

View File

@ -72,6 +72,7 @@ from sanic.log import (
from sanic.middleware import Middleware, MiddlewareLocation
from sanic.mixins.listeners import ListenerEvent
from sanic.mixins.startup import StartupMixin
from sanic.mixins.static import StaticHandleMixin
from sanic.models.futures import (
FutureException,
FutureListener,
@ -79,7 +80,6 @@ from sanic.models.futures import (
FutureRegistry,
FutureRoute,
FutureSignal,
FutureStatic,
)
from sanic.models.handler_types import ListenerType, MiddlewareType
from sanic.models.handler_types import Sanic as SanicVar
@ -106,7 +106,7 @@ if OS_IS_WINDOWS: # no cov
enable_windows_color_support()
class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"""
The main application instance
"""
@ -441,9 +441,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
return routes
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _apply_middleware(
self,
middleware: FutureMiddleware,
@ -890,11 +887,11 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
Union[
BaseHTTPResponse,
Coroutine[Any, Any, Optional[BaseHTTPResponse]],
ResponseStream,
]
] = None
run_middleware = True
try:
await self.dispatch(
"http.routing.before",
inline=True,
@ -926,7 +923,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
and request.stream.request_body
and not route.extra.ignore_body
):
if hasattr(handler, "is_stream"):
# Streaming handler: lift the size limit
request.stream.request_max_size = float("inf")
@ -1000,7 +996,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
...
await response.send(end_stream=True)
elif isinstance(response, ResponseStream):
resp = await response(request) # type: ignore
resp = await response(request)
await self.dispatch(
"http.lifecycle.response",
inline=True,
@ -1009,7 +1005,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"response": resp,
},
)
await response.eof() # type: ignore
await response.eof()
else:
if not hasattr(handler, "is_websocket"):
raise ServerError(

View File

@ -40,6 +40,8 @@ FULL_COLOR_LOGO = """
""" # noqa
SVG_LOGO = """<svg id=logo alt=Sanic viewBox="0 0 964 279"><path d="M107 222c9-2 10-20 1-22s-20-2-30-2-17 7-16 14 6 10 15 10h30zm115-1c16-2 30-11 35-23s6-24 2-33-6-14-15-20-24-11-38-10c-7 3-10 13-5 19s17-1 24 4 15 14 13 24-5 15-14 18-50 0-74 0h-17c-6 4-10 15-4 20s16 2 23 3zM251 83q9-1 9-7 0-15-10-16h-13c-10 6-10 20 0 22zM147 60c-4 0-10 3-11 11s5 13 10 12 42 0 67 0c5-3 7-10 6-15s-4-8-9-8zm-33 1c-8 0-16 0-24 3s-20 10-25 20-6 24-4 36 15 22 26 27 78 8 94 3c4-4 4-12 0-18s-69 8-93-10c-8-7-9-23 0-30s12-10 20-10 12 2 16-3 1-15-5-18z" fill="#ff0d68"/><path d="M676 74c0-14-18-9-20 0s0 30 0 39 20 9 20 2zm-297-10c-12 2-15 12-23 23l-41 58H340l22-30c8-12 23-13 30-4s20 24 24 38-10 10-17 10l-68 2q-17 1-48 30c-7 6-10 20 0 24s15-8 20-13 20 -20 58-21h50 c20 2 33 9 52 30 8 10 24-4 16-13L384 65q-3-2-5-1zm131 0c-10 1-12 12-11 20v96c1 10-3 23 5 32s20-5 17-15c0-23-3-46 2-67 5-12 22-14 32-5l103 87c7 5 19 1 18-9v-64c-3-10-20-9-21 2s-20 22-30 13l-97-80c-5-4-10-10-18-10zM701 76v128c2 10 15 12 20 4s0-102 0-124s-20-18-20-7z M850 63c-35 0-69-2-86 15s-20 60-13 66 13 8 16 0 1-10 1-27 12-26 20-32 66-5 85-5 31 4 31-10-18-7-54-7M764 159c-6-2-15-2-16 12s19 37 33 43 23 8 25-4-4-11-11-14q-9-3-22-18c-4-7-3-16-10-19zM828 196c-4 0-8 1-10 5s-4 12 0 15 8 2 12 2h60c5 0 10-2 12-6 3-7-1-16-8-16z" fill="#e1e1e1"/></svg>""" # noqa
ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

View File

@ -9,6 +9,7 @@ from sanic.mixins.listeners import ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
from sanic.mixins.signals import SignalMixin
from sanic.mixins.static import StaticMixin
VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
@ -16,6 +17,7 @@ VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
class BaseSanic(
RouteMixin,
StaticMixin,
MiddlewareMixin,
ListenerMixin,
ExceptionMixin,

View File

@ -4,7 +4,6 @@ from sanic.compat import UpperStrEnum
class HTTPMethod(UpperStrEnum):
GET = auto()
POST = auto()
PUT = auto()
@ -15,7 +14,6 @@ class HTTPMethod(UpperStrEnum):
class LocalCertCreator(UpperStrEnum):
AUTO = auto()
TRUSTME = auto()
MKCERT = auto()

View File

@ -12,6 +12,7 @@ Setting ``app.config.FALLBACK_ERROR_FORMAT = "auto"`` will enable a switch that
will attempt to provide an appropriate response format based upon the
request type.
"""
from __future__ import annotations
import sys
import typing as t
@ -21,8 +22,7 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES
from sanic.request import Request
from sanic.response import HTTPResponse, html, json, text
from sanic.response import html, json, text
dumps: t.Callable[..., str]
@ -33,6 +33,8 @@ try:
except ImportError: # noqa
from json import dumps
if t.TYPE_CHECKING:
from sanic import HTTPResponse, Request
DEFAULT_FORMAT = "auto"
FALLBACK_TEXT = (

View File

@ -0,0 +1,10 @@
from .content_range import ContentRangeHandler
from .directory import DirectoryHandler
from .error import ErrorHandler
__all__ = (
"ContentRangeHandler",
"DirectoryHandler",
"ErrorHandler",
)

View File

@ -0,0 +1,78 @@
from __future__ import annotations
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@ -0,0 +1,84 @@
from __future__ import annotations
from datetime import datetime
from operator import itemgetter
from pathlib import Path
from stat import S_ISDIR
from typing import Dict, Iterable, Optional, Sequence, Union, cast
from sanic.exceptions import NotFound
from sanic.pages.directory_page import DirectoryPage, FileInfo
from sanic.request import Request
from sanic.response import file, html, redirect
class DirectoryHandler:
def __init__(
self,
uri: str,
directory: Path,
directory_view: bool = False,
index: Optional[Union[str, Sequence[str]]] = None,
) -> None:
if isinstance(index, str):
index = [index]
elif index is None:
index = []
self.base = uri.strip("/")
self.directory = directory
self.directory_view = directory_view
self.index = tuple(index)
async def handle(self, request: Request, path: str):
current = path.strip("/")[len(self.base) :].strip("/") # noqa: E203
for file_name in self.index:
index_file = self.directory / current / file_name
if index_file.is_file():
return await file(index_file)
if self.directory_view:
return self._index(
self.directory / current, path, request.app.debug
)
if self.index:
raise NotFound("File not found")
raise IsADirectoryError(f"{self.directory.as_posix()} is a directory")
def _index(self, location: Path, path: str, debug: bool):
# Remove empty path elements, append slash
if "//" in path or not path.endswith("/"):
return redirect(
"/" + "".join([f"{p}/" for p in path.split("/") if p])
)
# Render file browser
page = DirectoryPage(self._iter_files(location), path, debug)
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
stat = path.stat()
modified = (
datetime.fromtimestamp(stat.st_mtime)
.isoformat()[:19]
.replace("T", " ")
)
is_dir = S_ISDIR(stat.st_mode)
icon = "📁" if is_dir else "📄"
file_name = path.name
if is_dir:
file_name += "/"
return {
"priority": is_dir * -1,
"file_name": file_name,
"icon": icon,
"file_access": modified,
"file_size": stat.st_size,
}
def _iter_files(self, location: Path) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in location.iterdir()]
for item in sorted(prepared, key=itemgetter("priority", "file_name")):
del item["priority"]
yield cast(FileInfo, item)

View File

@ -3,11 +3,6 @@ from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Type
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
from sanic.log import deprecation, error_logger
from sanic.models.handler_types import RouteHandler
from sanic.response import text
@ -23,7 +18,6 @@ class ErrorHandler:
by the developers to perform a wide range of tasks from recording the error
stats to reporting them to an external service that can be used for
realtime alerting system.
"""
def __init__(
@ -196,74 +190,3 @@ class ErrorHandler:
error_logger.exception(
"Exception occurred while handling uri: %s", url
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@ -126,7 +126,6 @@ class CertCreator(ABC):
local_tls_key,
local_tls_cert,
) -> CertCreator:
creator: Optional[CertCreator] = None
cert_creator_options: Tuple[

35
sanic/mixins/base.py Normal file
View File

@ -0,0 +1,35 @@
from typing import Optional
from sanic.base.meta import SanicMeta
class BaseMixin(metaclass=SanicMeta):
name: str
strict_slashes: Optional[bool]
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name

View File

@ -1,11 +1,6 @@
from ast import NodeVisitor, Return, parse
from contextlib import suppress
from email.utils import formatdate
from functools import partial, wraps
from inspect import getsource, signature
from mimetypes import guess_type
from os import path
from pathlib import Path, PurePath
from textwrap import dedent
from typing import (
Any,
@ -19,20 +14,15 @@ from typing import (
Union,
cast,
)
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, HTTP_METHODS
from sanic.constants import HTTP_METHODS
from sanic.errorpages import RESPONSE_MAPPING
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureRoute, FutureStatic
from sanic.models.handler_types import RouteHandler
from sanic.response import HTTPResponse, file, file_stream, validate_file
from sanic.types import HashableDict
@ -41,20 +31,14 @@ RouteWrapper = Callable[
]
class RouteMixin(metaclass=SanicMeta):
name: str
class RouteMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_routes: Set[FutureRoute] = set()
self._future_statics: Set[FutureStatic] = set()
self.strict_slashes: Optional[bool] = False
def _apply_route(self, route: FutureRoute) -> List[Route]:
raise NotImplementedError # noqa
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def route(
self,
uri: str,
@ -688,324 +672,6 @@ class RouteMixin(metaclass=SanicMeta):
**ctx_kwargs,
)(handler)
def static(
self,
uri: str,
file_or_directory: Union[str, bytes, PurePath],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: bool = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[bool] = None,
apply: bool = True,
resource_type: Optional[str] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path
async def _static_request_handler(
self,
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
request,
content_type=None,
__file_uri__=None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if type(stream_large_files) == int:
threshold = stream_large_files
else:
threshold = 1024 * 1024
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
f"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
:param app: Sanic
:param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the
server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the file_stream() handler
rather than the file() handler to send the file
If this is an integer, this represents the
threshold size to switch to file_stream()
:param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header
:return: registered static routes
:rtype: List[sanic.router.Route]
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory,
static.use_modified_since,
static.use_content_range,
static.stream_large_files,
content_type=static.content_type,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
def _determine_error_format(self, handler) -> str:
with suppress(OSError, TypeError):
src = dedent(getsource(handler))

View File

@ -1109,7 +1109,6 @@ class StartupMixin(metaclass=SanicMeta):
app: StartupMixin,
server_info: ApplicationServerInfo,
) -> None: # no cov
try:
# We should never get to this point without a server
# This is primarily to keep mypy happy

348
sanic/mixins/static.py Normal file
View File

@ -0,0 +1,348 @@
from email.utils import formatdate
from functools import partial, wraps
from mimetypes import guess_type
from os import PathLike, path
from pathlib import Path, PurePath
from typing import Optional, Sequence, Set, Union, cast
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.handlers.directory import DirectoryHandler
from sanic.log import deprecation, error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureStatic
from sanic.request import Request
from sanic.response import HTTPResponse, file, file_stream, validate_file
class StaticMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_statics: Set[FutureStatic] = set()
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def static(
self,
uri: str,
file_or_directory: Union[PathLike, str, bytes],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: Union[bool, int] = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[str] = None,
apply: bool = True,
resource_type: Optional[str] = None,
index: Optional[Union[str, Sequence[str]]] = None,
directory_view: bool = False,
directory_handler: Optional[DirectoryHandler] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:param apply: If true, will register the route immediately
:param resource_type: Explicitly declare a resource to be a "
file" or a "dir"
:param index: When exposing against a directory, index is the name that
will be served as the default file. When multiple files names are
passed, then they will be tried in order.
:param directory_view: Whether to fallback to showing the directory
viewer when exposing a directory
:param directory_handler: An instance of :class:`DirectoryHandler`
that can be used for explicitly controlling and subclassing the
behavior of the default directory handler
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
if isinstance(file_or_directory, bytes):
deprecation(
"Serving a static directory with a bytes string is "
"deprecated and will be removed in v22.9.",
22.9,
)
file_or_directory = cast(str, file_or_directory.decode())
file_or_directory = Path(file_or_directory)
if directory_handler and (directory_view or index):
raise ValueError(
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
if not directory_handler:
directory_handler = DirectoryHandler(
uri=uri,
directory=file_or_directory,
directory_view=directory_view,
index=index,
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
directory_handler,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
class StaticHandleMixin(metaclass=SanicMeta):
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory=file_or_directory,
use_modified_since=static.use_modified_since,
use_content_range=static.use_content_range,
stream_large_files=static.stream_large_files,
content_type=static.content_type,
directory_handler=static.directory_handler,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
async def _static_request_handler(
self,
request: Request,
*,
file_or_directory: PathLike,
use_modified_since: bool,
use_content_range: bool,
stream_large_files: Union[bool, int],
directory_handler: DirectoryHandler,
content_type: Optional[str] = None,
__file_uri__: Optional[str] = None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if isinstance(stream_large_files, bool):
threshold = 1024 * 1024
else:
threshold = stream_large_files
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except (IsADirectoryError, PermissionError):
return await directory_handler.handle(request, request.path)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path

View File

@ -1,6 +1,7 @@
from pathlib import PurePath
from pathlib import Path
from typing import Dict, Iterable, List, NamedTuple, Optional, Union
from sanic.handlers.directory import DirectoryHandler
from sanic.models.handler_types import (
ErrorMiddlewareType,
ListenerType,
@ -46,16 +47,17 @@ class FutureException(NamedTuple):
class FutureStatic(NamedTuple):
uri: str
file_or_directory: Union[str, bytes, PurePath]
file_or_directory: Path
pattern: str
use_modified_since: bool
use_content_range: bool
stream_large_files: bool
stream_large_files: Union[bool, int]
name: str
host: Optional[str]
strict_slashes: Optional[bool]
content_type: Optional[bool]
content_type: Optional[str]
resource_type: Optional[str]
directory_handler: DirectoryHandler
class FutureSignal(NamedTuple):

0
sanic/pages/__init__.py Normal file
View File

51
sanic/pages/base.py Normal file
View File

@ -0,0 +1,51 @@
from abc import ABC, abstractmethod
from html5tagger import HTML, Document
from sanic import __version__ as VERSION
from sanic.application.logo import SVG_LOGO
from sanic.pages.css import CSS
class BasePage(ABC, metaclass=CSS): # no cov
TITLE = "Unknown"
CSS: str
def __init__(self, debug: bool = True) -> None:
self.doc = Document(self.TITLE, lang="en")
self.debug = debug
@property
def style(self) -> str:
return self.CSS
def render(self) -> str:
self._head()
self._body()
self._foot()
return str(self.doc)
def _head(self) -> None:
self.doc.style(HTML(self.style))
with self.doc.header:
self.doc.div(self.TITLE)
def _foot(self) -> None:
with self.doc.footer:
self.doc.div("powered by")
with self.doc.div:
self._sanic_logo()
if self.debug:
self.doc.div(f"Version {VERSION}")
@abstractmethod
def _body(self) -> None:
...
def _sanic_logo(self) -> None:
self.doc.a(
HTML(SVG_LOGO),
href="https://sanic.dev",
target="_blank",
referrerpolicy="no-referrer",
)

35
sanic/pages/css.py Normal file
View File

@ -0,0 +1,35 @@
from abc import ABCMeta
from pathlib import Path
from typing import Optional
CURRENT_DIR = Path(__file__).parent
def _extract_style(maybe_style: Optional[str], name: str) -> str:
if maybe_style is not None:
maybe_path = Path(maybe_style)
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return maybe_style
maybe_path = CURRENT_DIR / "styles" / f"{name}.css"
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return ""
class CSS(ABCMeta):
"""Cascade stylesheets, i.e. combine all ancestor styles"""
def __new__(cls, name, bases, attrs):
Page = super().__new__(cls, name, bases, attrs)
# Use a locally defined STYLE or the one from styles directory
s = _extract_style(attrs.get("STYLE"), name)
Page.STYLE = f"\n/* {name} */\n{s.strip()}\n" if s else ""
# Combine with all ancestor styles
Page.CSS = "".join(
Class.STYLE
for Class in reversed(Page.__mro__)
if type(Class) is CSS
)
return Page

View File

@ -0,0 +1,66 @@
import sys
from typing import Dict, Iterable
from html5tagger import E
from .base import BasePage
if sys.version_info < (3, 8): # no cov
FileInfo = Dict
else:
from typing import TypedDict
class FileInfo(TypedDict):
icon: str
file_name: str
file_access: str
file_size: str
class DirectoryPage(BasePage): # no cov
TITLE = "Directory Viewer"
def __init__(
self, files: Iterable[FileInfo], url: str, debug: bool
) -> None:
super().__init__(debug)
self.files = files
self.url = url
def _body(self) -> None:
with self.doc.main:
self._headline()
files = list(self.files)
if files:
self._file_table(files)
else:
self.doc.p("The folder is empty.")
def _headline(self):
"""Implement a heading with the current path, combined with
breadcrumb links"""
with self.doc.h1(id="breadcrumbs"):
p = self.url.split("/")[:-1]
for i, part in enumerate(p):
path = "/".join(p[: i + 1]) + "/"
with self.doc.a(href=path):
self.doc.span(part, class_="dir").span("/", class_="sep")
def _file_table(self, files: Iterable[FileInfo]):
with self.doc.table(class_="autoindex container"):
for f in files:
self._file_row(**f)
def _file_row(
self,
icon: str,
file_name: str,
file_access: str,
file_size: str,
):
first = E.span(icon, class_="icon").a(file_name, href=file_name)
self.doc.tr.td(first).td(file_size).td(file_access)

View File

@ -0,0 +1,79 @@
html {
font: 16px sans-serif;
background: #eee;
color: #111;
}
body {
margin: 0;
font-size: 1.25rem;
}
body>* {
padding: 1rem 2vw;
}
@media (max-width: 1200px) {
body>* {
padding: 0.5rem 1.5vw;
}
body {
font-size: 1rem;
}
}
.container {
min-width: 600px;
max-width: 1600px;
}
header {
background: #111;
color: #e1e1e1;
border-bottom: 1px solid #272727;
text-align: center;
}
footer {
text-align: center;
display: flex;
flex-direction: column;
font-size: 0.8rem;
margin-top: 2rem;
}
h1 {
text-align: left;
}
a:visited {
color: inherit;
}
a {
text-decoration: none;
color: #88f;
}
a:hover,
a:focus {
text-decoration: underline;
outline: none;
}
#logo {
height: 1.75rem;
padding: 0 0.25rem;
}
span.icon {
margin-right: 1rem;
}
@media (prefers-color-scheme: dark) {
html {
background: #111;
color: #ccc;
}
}

View File

@ -0,0 +1,62 @@
#breadcrumbs>a:hover {
text-decoration: none;
}
#breadcrumbs>a .dir {
padding: 0 0.25em;
}
#breadcrumbs>a:first-child:hover::before,
#breadcrumbs>a .dir:hover {
text-decoration: underline;
}
#breadcrumbs>a:first-child::before {
content: "🏠";
}
#breadcrumbs>a:last-child {
color: #ff0d68;
}
main a {
color: inherit;
font-weight: bold;
}
table.autoindex {
width: 100%;
font-family: monospace;
font-size: 1.25rem;
}
table.autoindex tr {
display: flex;
}
table.autoindex tr:hover {
background-color: #ddd;
}
table.autoindex td {
margin: 0 0.5rem;
}
table.autoindex td:first-child {
flex: 1;
}
table.autoindex td:nth-child(2) {
text-align: right;
}
table.autoindex td:last-child {
text-align: right;
}
@media (prefers-color-scheme: dark) {
table.autoindex tr:hover {
background-color: #222;
}
}

View File

@ -146,7 +146,6 @@ class Request:
head: bytes = b"",
stream_id: int = 0,
):
self.raw_url = url_bytes
try:
self._parsed_url = parse_url(url_bytes)

View File

@ -94,7 +94,6 @@ def watchdog(sleep_interval, reload_dirs):
try:
while True:
changed = set()
for filename in itertools.chain(
_iter_module_files(),

View File

@ -52,7 +52,6 @@ class WebsocketFrameAssembler:
paused: bool
def __init__(self, protocol) -> None:
self.protocol = protocol
self.read_mutex = asyncio.Lock()

View File

@ -686,7 +686,6 @@ class WebsocketImplProtocol:
:raises TypeError: for unsupported inputs
"""
async with self.conn_mutex:
if self.ws_proto.state in (CLOSED, CLOSING):
raise WebsocketClosed(
"Cannot write to websocket interface after it is closed."

View File

@ -2,7 +2,6 @@ from pathlib import Path
from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.response import redirect
def create_simple_server(directory: Path):
@ -12,10 +11,8 @@ def create_simple_server(directory: Path):
)
app = Sanic("SimpleServer")
app.static("/", directory, name="main")
@app.get("/")
def index(_):
return redirect(app.url_for("main", filename="index.html"))
app.static(
"/", directory, name="main", directory_view=True, index="index.html"
)
return app

View File

@ -11,7 +11,6 @@ class TouchUpMeta(SanicMeta):
methods = attrs.get("__touchup__")
attrs["__touched__"] = False
if methods:
for method in methods:
if method not in attrs:
raise SanicException(

View File

@ -75,7 +75,6 @@ def load_module_from_file_location(
location = location.decode(encoding)
if isinstance(location, Path) or "/" in location or "$" in location:
if not isinstance(location, Path):
# A) Check if location contains any environment variables
# in format ${some_env_var}.

View File

@ -35,6 +35,7 @@ def open_local(paths, mode="r", encoding="utf8"):
return codecs.open(path, mode, encoding)
def str_to_bool(val: str) -> bool:
val = val.lower()
if val in {
@ -55,6 +56,7 @@ def str_to_bool(val: str) -> bool:
else:
raise ValueError(f"Invalid truth value {val}")
with open_local(["sanic", "__version__.py"], encoding="latin1") as fp:
try:
version = re.findall(
@ -79,7 +81,7 @@ setup_kwargs = {
),
"long_description": long_description,
"packages": find_packages(exclude=("tests", "tests.*")),
"package_data": {"sanic": ["py.typed"]},
"package_data": {"sanic": ["py.typed", "pages/styles/*"]},
"platforms": "any",
"python_requires": ">=3.7",
"classifiers": [
@ -109,6 +111,7 @@ requirements = [
"aiofiles>=0.6.0",
"websockets>=10.0",
"multidict>=5.0,<7.0",
"html5tagger>=1.2.1",
]
tests_require = [

View File

@ -1,5 +1,7 @@
import asyncio
import inspect
import logging
import os
import random
import re
import string
@ -58,7 +60,6 @@ CACHE: Dict[str, Any] = {}
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = HTTP_METHODS
ROUTE_PARAM_TYPES = ["str", "int", "float", "alpha", "uuid"]
@ -232,3 +233,12 @@ def urlopen():
urlopen.read = Mock()
with patch("sanic.cli.inspector_client.urlopen", urlopen):
yield urlopen
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory

View File

@ -36,6 +36,7 @@ def test_app_loop_running(app: Sanic):
assert response.text == "pass"
@pytest.mark.asyncio
def test_create_asyncio_server(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
@ -44,6 +45,7 @@ def test_create_asyncio_server(app: Sanic):
assert srv.is_serving() is True
@pytest.mark.asyncio
def test_asyncio_server_no_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@ -55,6 +57,7 @@ def test_asyncio_server_no_start_serving(app: Sanic):
assert srv.is_serving() is False
@pytest.mark.asyncio
def test_asyncio_server_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@ -72,6 +75,7 @@ def test_asyncio_server_start_serving(app: Sanic):
# Looks like we can't easily test `serve_forever()`
@pytest.mark.asyncio
def test_create_server_main(app: Sanic, caplog):
app.listener("main_process_start")(lambda *_: ...)
loop = asyncio.get_event_loop()
@ -86,6 +90,7 @@ def test_create_server_main(app: Sanic, caplog):
) in caplog.record_tuples
@pytest.mark.asyncio
def test_create_server_no_startup(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@ -101,6 +106,7 @@ def test_create_server_no_startup(app: Sanic):
loop.run_until_complete(srv.start_serving())
@pytest.mark.asyncio
def test_create_server_main_convenience(app: Sanic, caplog):
app.main_process_start(lambda *_: ...)
loop = asyncio.get_event_loop()
@ -126,7 +132,6 @@ def test_app_loop_not_running(app: Sanic):
def test_app_run_raise_type_error(app: Sanic):
with pytest.raises(TypeError) as excinfo:
app.run(loop="loop")
@ -139,7 +144,6 @@ def test_app_run_raise_type_error(app: Sanic):
def test_app_route_raise_value_error(app: Sanic):
with pytest.raises(ValueError) as excinfo:
@app.route("/test")
@ -221,7 +225,6 @@ def test_app_websocket_parameters(websocket_protocol_mock, app: Sanic):
def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@ -241,7 +244,6 @@ def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
def test_handle_request_with_nested_exception_debug(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@ -470,6 +472,7 @@ def test_uvloop_config(app: Sanic, monkeypatch, use):
try_use_uvloop.assert_not_called()
@pytest.mark.asyncio
def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
apps = (Sanic("default-uvloop"), Sanic("no-uvloop"), Sanic("yes-uvloop"))
@ -506,6 +509,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
assert counter[(logging.WARNING, message)] == modified
@pytest.mark.asyncio
def test_multiple_uvloop_configs_display_warning(caplog):
Sanic._uvloop_setting = None # Reset the setting (changed in prev tests)

View File

@ -148,7 +148,6 @@ def test_cookie_set_unknown_property():
def test_cookie_set_same_key(app):
cookies = {"test": "wait"}
@app.get("/")

View File

@ -62,7 +62,6 @@ def exception_handler_app():
@exception_handler_app.route("/8", error_format="html")
def handler_8(request):
raise ErrorWithRequestCtx("OK")
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
@ -214,7 +213,7 @@ def test_error_handler_noisy_log(
exception_handler_app: Sanic, monkeypatch: MonkeyPatch
):
err_logger = Mock()
monkeypatch.setattr(handlers, "error_logger", err_logger)
monkeypatch.setattr(handlers.error, "error_logger", err_logger)
exception_handler_app.config["NOISY_EXCEPTIONS"] = False
exception_handler_app.test_client.get("/1")

View File

@ -514,7 +514,6 @@ def test_file_stream_head_response(
def test_file_stream_response_range(
app: Sanic, file_name, static_file_directory, size, start, end
):
Range = namedtuple("Range", ["size", "start", "end", "total"])
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)

View File

@ -722,7 +722,6 @@ def test_add_webscoket_route_with_version(app):
def test_route_duplicate(app):
with pytest.raises(RouteExists):
@app.route("/test")
@ -819,7 +818,6 @@ def test_unquote_add_route(app, unquote):
def test_dynamic_add_route(app):
results = []
async def handler(request, name):
@ -834,7 +832,6 @@ def test_dynamic_add_route(app):
def test_dynamic_add_route_string(app):
results = []
async def handler(request, name):
@ -938,7 +935,6 @@ def test_dynamic_add_route_unhashable(app):
def test_add_route_duplicate(app):
with pytest.raises(RouteExists):
async def handler1(request):
@ -1120,7 +1116,6 @@ def test_route_raise_ParameterNameConflicts(app):
def test_route_invalid_host(app):
host = 321
with pytest.raises(ValueError) as excinfo:

View File

@ -93,6 +93,7 @@ def test_dont_register_system_signals(app):
@pytest.mark.skipif(os.name == "nt", reason="windows cannot SIGINT processes")
def test_windows_workaround():
"""Test Windows workaround (on any other OS)"""
# At least some code coverage, even though this test doesn't work on
# Windows...
class MockApp:

View File

@ -1,4 +1,3 @@
import inspect
import logging
import os
import sys
@ -13,15 +12,6 @@ from sanic import Sanic, text
from sanic.exceptions import FileNotFound
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory
@pytest.fixture(scope="module")
def double_dotted_directory_file(static_file_directory: str):
"""Generate double dotted directory and its files"""
@ -118,6 +108,11 @@ def test_static_file_pathlib(app, static_file_directory, file_name):
def test_static_file_bytes(app, static_file_directory, file_name):
bsep = os.path.sep.encode("utf-8")
file_path = static_file_directory.encode("utf-8") + bsep + file_name
message = (
"Serving a static directory with a bytes "
"string is deprecated and will be removed in v22.9."
)
with pytest.warns(DeprecationWarning, match=message):
app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
@ -431,7 +426,6 @@ def test_static_stream_large_file(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_use_modified_since(app, static_file_directory, file_name):
file_stat = os.stat(get_file_path(static_file_directory, file_name))
modified_since = strftime(
"%a, %d %b %Y %H:%M:%S GMT", gmtime(file_stat.st_mtime)

View File

@ -0,0 +1,123 @@
import os
from pathlib import Path
import pytest
from sanic import Sanic
from sanic.handlers.directory import DirectoryHandler
def get_file_path(static_file_directory, file_name):
return os.path.join(static_file_directory, file_name)
def get_file_content(static_file_directory, file_name):
"""The content of the static file to check"""
with open(get_file_path(static_file_directory, file_name), "rb") as file:
return file.read()
def test_static_directory_view(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, directory_view=True)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
def test_static_index_single(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="test.html")
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_index_single_not_found(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="index.html")
_, response = app.test_client.get("/static/")
assert response.status == 404
def test_static_index_multiple(app: Sanic, static_file_directory: str):
app.static(
"/static",
static_file_directory,
index=["index.html", "test.html"],
)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_directory_view_and_index(
app: Sanic, static_file_directory: str
):
app.static(
"/static",
static_file_directory,
directory_view=True,
index="foo.txt",
)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler(app: Sanic, static_file_directory: str):
dh = DirectoryHandler(
"/static",
Path(static_file_directory),
directory_view=True,
index="foo.txt",
)
app.static("/static", static_file_directory, directory_handler=dh)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler_fails(app: Sanic):
dh = DirectoryHandler(
"/static",
Path(""),
directory_view=True,
index="foo.txt",
)
message = (
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, directory_view=True)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, index="index.html")

View File

@ -654,7 +654,6 @@ def test_sanic_ssl_context_create():
reason="This test requires fork context",
)
def test_ssl_in_multiprocess_mode(app: Sanic, caplog):
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
event = Event()

View File

@ -176,7 +176,6 @@ def handler(request: Request):
async def client(app: Sanic, loop: AbstractEventLoop):
try:
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
async with httpx.AsyncClient(transport=transport) as client:
r = await client.get("http://myhost.invalid/")

View File

@ -83,7 +83,6 @@ def test_simple_url_for_getting_with_more_params(app, args, url):
def test_url_for_with_server_name(app):
server_name = f"{test_host}:{test_port}"
app.config.update({"SERVER_NAME": server_name})
path = "/myurl"

View File

@ -38,7 +38,6 @@ def test_load_module_from_file_location_with_non_existing_env_variable():
LoadFileException,
match="The following environment variables are not set: MuuMilk",
):
load_module_from_file_location("${MuuMilk}")