diff --git a/sanic/errorpages.py b/sanic/errorpages.py index c035dce1..a2d02bf0 100644 --- a/sanic/errorpages.py +++ b/sanic/errorpages.py @@ -12,6 +12,7 @@ Setting ``app.config.FALLBACK_ERROR_FORMAT = "auto"`` will enable a switch that will attempt to provide an appropriate response format based upon the request type. """ +from __future__ import annotations import sys import typing as t @@ -21,8 +22,7 @@ from traceback import extract_tb from sanic.exceptions import BadRequest, SanicException from sanic.helpers import STATUS_CODES -from sanic.request import Request -from sanic.response import HTTPResponse, html, json, text +from sanic.response import html, json, text dumps: t.Callable[..., str] @@ -33,6 +33,8 @@ try: except ImportError: # noqa from json import dumps +if t.TYPE_CHECKING: + from sanic import HTTPResponse, Request DEFAULT_FORMAT = "auto" FALLBACK_TEXT = ( diff --git a/sanic/exceptions.py b/sanic/exceptions.py index 8baac0a4..64b87ed6 100644 --- a/sanic/exceptions.py +++ b/sanic/exceptions.py @@ -1,4 +1,5 @@ from asyncio import CancelledError +from pathlib import Path from typing import Any, Dict, Optional, Union from sanic.helpers import STATUS_CODES @@ -269,3 +270,10 @@ class InvalidSignal(SanicException): class WebsocketClosed(SanicException): quiet = True message = "Client has closed the websocket connection" + + +class SanicIsADirectoryError(SanicException): + quiet = True + location: Path + autoindex: bool + index_name: str diff --git a/sanic/handlers/__init__.py b/sanic/handlers/__init__.py new file mode 100644 index 00000000..4777a8da --- /dev/null +++ b/sanic/handlers/__init__.py @@ -0,0 +1,10 @@ +from .content_range import ContentRangeHandler +from .directory import DirectoryHandler +from .error import ErrorHandler + + +__all__ = ( + "ContentRangeHandler", + "DirectoryHandler", + "ErrorHandler", +) diff --git a/sanic/handlers/content_range.py b/sanic/handlers/content_range.py new file mode 100644 index 00000000..f3a23d1a --- /dev/null +++ b/sanic/handlers/content_range.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +from sanic.exceptions import ( + HeaderNotFound, + InvalidRangeType, + RangeNotSatisfiable, +) + + +class ContentRangeHandler: + """ + A mechanism to parse and process the incoming request headers to + extract the content range information. + + :param request: Incoming api request + :param stats: Stats related to the content + + :type request: :class:`sanic.request.Request` + :type stats: :class:`posix.stat_result` + + :ivar start: Content Range start + :ivar end: Content Range end + :ivar size: Length of the content + :ivar total: Total size identified by the :class:`posix.stat_result` + instance + :ivar ContentRangeHandler.headers: Content range header ``dict`` + """ + + __slots__ = ("start", "end", "size", "total", "headers") + + def __init__(self, request, stats): + self.total = stats.st_size + _range = request.headers.getone("range", None) + if _range is None: + raise HeaderNotFound("Range Header Not Found") + unit, _, value = tuple(map(str.strip, _range.partition("="))) + if unit != "bytes": + raise InvalidRangeType( + "%s is not a valid Range Type" % (unit,), self + ) + start_b, _, end_b = tuple(map(str.strip, value.partition("-"))) + try: + self.start = int(start_b) if start_b else None + except ValueError: + raise RangeNotSatisfiable( + "'%s' is invalid for Content Range" % (start_b,), self + ) + try: + self.end = int(end_b) if end_b else None + except ValueError: + raise RangeNotSatisfiable( + "'%s' is invalid for Content Range" % (end_b,), self + ) + if self.end is None: + if self.start is None: + raise RangeNotSatisfiable( + "Invalid for Content Range parameters", self + ) + else: + # this case represents `Content-Range: bytes 5-` + self.end = self.total - 1 + else: + if self.start is None: + # this case represents `Content-Range: bytes -5` + self.start = self.total - self.end + self.end = self.total - 1 + if self.start >= self.end: + raise RangeNotSatisfiable( + "Invalid for Content Range parameters", self + ) + self.size = self.end - self.start + 1 + self.headers = { + "Content-Range": "bytes %s-%s/%s" + % (self.start, self.end, self.total) + } + + def __bool__(self): + return self.size > 0 diff --git a/sanic/handlers/directory.py b/sanic/handlers/directory.py new file mode 100644 index 00000000..56042118 --- /dev/null +++ b/sanic/handlers/directory.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from datetime import datetime +from operator import itemgetter +from pathlib import Path +from stat import S_ISDIR +from typing import Any, Coroutine, Dict, Iterable, Optional, Union, cast + +from sanic.exceptions import SanicIsADirectoryError +from sanic.pages.autoindex import AutoIndex, FileInfo +from sanic.request import Request +from sanic.response import file, html +from sanic.response.types import HTTPResponse + + +class DirectoryHandler: + def __init__( + self, directory: Path, autoindex: bool, index_name: str + ) -> None: + self.directory = directory + self.autoindex = autoindex + self.index_name = index_name + + def handle(self): + index_file = self.directory / self.index_name + if self.autoindex and (not index_file.exists() or not self.index_name): + return self.index() + + if self.index_name: + return file(index_file) + + def index(self): + page = AutoIndex(self._iter_files()) + return html(page.render()) + + def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]: + stat = path.stat() + modified = datetime.fromtimestamp(stat.st_mtime) + is_dir = S_ISDIR(stat.st_mode) + icon = "📁" if is_dir else "📄" + file_name = path.name + if is_dir: + file_name += "/" + return { + "priority": is_dir * -1, + "file_name": file_name, + "icon": icon, + "file_access": modified.isoformat(), + "file_size": stat.st_size, + } + + def _iter_files(self) -> Iterable[FileInfo]: + prepared = [self._prepare_file(f) for f in self.directory.iterdir()] + for item in sorted(prepared, key=itemgetter("priority")): + del item["priority"] + yield cast(FileInfo, item) + + @classmethod + def default_handler( + cls, request: Request, exception: SanicIsADirectoryError + ) -> Optional[Union[HTTPResponse, Coroutine[Any, Any, HTTPResponse]]]: + if exception.autoindex or exception.index_name: + maybe_response = DirectoryHandler( + exception.location, exception.autoindex, exception.index_name + ).handle() + if maybe_response: + return maybe_response + return None diff --git a/sanic/handlers.py b/sanic/handlers/error.py similarity index 72% rename from sanic/handlers.py rename to sanic/handlers/error.py index e3aa8646..ad2ab5c4 100644 --- a/sanic/handlers.py +++ b/sanic/handlers/error.py @@ -3,11 +3,8 @@ from __future__ import annotations from typing import Dict, List, Optional, Tuple, Type from sanic.errorpages import BaseRenderer, TextRenderer, exception_response -from sanic.exceptions import ( - HeaderNotFound, - InvalidRangeType, - RangeNotSatisfiable, -) +from sanic.exceptions import SanicIsADirectoryError +from sanic.handlers.directory import DirectoryHandler from sanic.log import deprecation, error_logger from sanic.models.handler_types import RouteHandler from sanic.response import text @@ -23,16 +20,21 @@ class ErrorHandler: by the developers to perform a wide range of tasks from recording the error stats to reporting them to an external service that can be used for realtime alerting system. - """ + DEFAULT_HANDLERS = { + (SanicIsADirectoryError, None): DirectoryHandler.default_handler + } + def __init__( self, base: Type[BaseRenderer] = TextRenderer, ): self.cached_handlers: Dict[ Tuple[Type[BaseException], Optional[str]], Optional[RouteHandler] - ] = {} + ] = { + **self.DEFAULT_HANDLERS # type: ignore + } self.debug = False self.base = base @@ -196,74 +198,3 @@ class ErrorHandler: error_logger.exception( "Exception occurred while handling uri: %s", url ) - - -class ContentRangeHandler: - """ - A mechanism to parse and process the incoming request headers to - extract the content range information. - - :param request: Incoming api request - :param stats: Stats related to the content - - :type request: :class:`sanic.request.Request` - :type stats: :class:`posix.stat_result` - - :ivar start: Content Range start - :ivar end: Content Range end - :ivar size: Length of the content - :ivar total: Total size identified by the :class:`posix.stat_result` - instance - :ivar ContentRangeHandler.headers: Content range header ``dict`` - """ - - __slots__ = ("start", "end", "size", "total", "headers") - - def __init__(self, request, stats): - self.total = stats.st_size - _range = request.headers.getone("range", None) - if _range is None: - raise HeaderNotFound("Range Header Not Found") - unit, _, value = tuple(map(str.strip, _range.partition("="))) - if unit != "bytes": - raise InvalidRangeType( - "%s is not a valid Range Type" % (unit,), self - ) - start_b, _, end_b = tuple(map(str.strip, value.partition("-"))) - try: - self.start = int(start_b) if start_b else None - except ValueError: - raise RangeNotSatisfiable( - "'%s' is invalid for Content Range" % (start_b,), self - ) - try: - self.end = int(end_b) if end_b else None - except ValueError: - raise RangeNotSatisfiable( - "'%s' is invalid for Content Range" % (end_b,), self - ) - if self.end is None: - if self.start is None: - raise RangeNotSatisfiable( - "Invalid for Content Range parameters", self - ) - else: - # this case represents `Content-Range: bytes 5-` - self.end = self.total - 1 - else: - if self.start is None: - # this case represents `Content-Range: bytes -5` - self.start = self.total - self.end - self.end = self.total - 1 - if self.start >= self.end: - raise RangeNotSatisfiable( - "Invalid for Content Range parameters", self - ) - self.size = self.end - self.start + 1 - self.headers = { - "Content-Range": "bytes %s-%s/%s" - % (self.start, self.end, self.total) - } - - def __bool__(self): - return self.size > 0 diff --git a/sanic/pages/__init__.py b/sanic/pages/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sanic/pages/autoindex.py b/sanic/pages/autoindex.py new file mode 100644 index 00000000..df9b501f --- /dev/null +++ b/sanic/pages/autoindex.py @@ -0,0 +1,55 @@ +from textwrap import dedent +from typing import Iterable, TypedDict + +from html5tagger import E + +from .base import BasePage + + +class FileInfo(TypedDict): + icon: str + file_name: str + file_access: str + file_size: str + + +class AutoIndex(BasePage): + EXTRA_STYLE = dedent( + """ + table.autoindex td:first-child { width: 65% } + table.autoindex td:nth-child(2) { text-align: right; } + table.autoindex td:last-child { width: 215px; text-align: right; } + """ + ) + TITLE = "📁 File browser" + + def __init__(self, files: Iterable[FileInfo]) -> None: + super().__init__() + self.files = files + + def _body(self) -> None: + with self.doc.main: + self._headline() + self._file_table(self.files) + + def _headline(self): + self.doc.h1(self.TITLE) + + def _file_table(self, files: Iterable[FileInfo]): + with self.doc.table(class_="autoindex"): + self._parent() + for f in files: + self._file_row(**f) + + def _parent(self): + self._file_row("📁", "..", "", "") + + def _file_row( + self, + icon: str, + file_name: str, + file_access: str, + file_size: str, + ): + first = E.span(icon, class_="icon").a(file_name, href=file_name) + self.doc.tr.td(first).td(file_size).td(file_access) diff --git a/sanic/pages/base.py b/sanic/pages/base.py new file mode 100644 index 00000000..d88d922f --- /dev/null +++ b/sanic/pages/base.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +from textwrap import dedent + +from html5tagger import Document + + +class BasePage(ABC): + BASE_STYLE = dedent( + """ + html { font-family: sans-serif; } + main { padding: 1rem; } + table { width: 100%; max-width: 1200px; } + td { font-family: monospace; } + span.icon { margin-right: 1rem; } + @media (prefers-color-scheme: dark) { + html { background: #111; color: #ccc; } + a { color: #ccc; } + a:visited { color: #777; } + } + """ + ) + EXTRA_STYLE = "" + TITLE = "Unknown" + + def __init__(self) -> None: + self.doc = Document(title=self.TITLE, lang="en") + + @property + def style(self) -> str: + return f"{self.BASE_STYLE}\n{self.EXTRA_STYLE}" + + def render(self) -> str: + self._head() + self._body() + return str(self.doc) + + def _head(self) -> None: + self.doc.head.title(self.TITLE).style(self.style) + + @abstractmethod + def _body(self) -> None: + ... diff --git a/sanic/response/convenience.py b/sanic/response/convenience.py index 214d8f78..9067f1b7 100644 --- a/sanic/response/convenience.py +++ b/sanic/response/convenience.py @@ -3,20 +3,18 @@ from __future__ import annotations from datetime import datetime, timezone from email.utils import formatdate, parsedate_to_datetime from mimetypes import guess_type -from operator import itemgetter from os import path from pathlib import Path, PurePath -from stat import S_ISDIR from time import time -from typing import Any, AnyStr, Callable, Dict, Iterable, Optional, Union, cast +from typing import Any, AnyStr, Callable, Dict, Optional, Union from urllib.parse import quote_plus from sanic.compat import Header, open_async, stat_async from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE +from sanic.exceptions import SanicIsADirectoryError from sanic.helpers import Default, _default from sanic.log import logger from sanic.models.protocol_types import HTMLProtocol, Range -from sanic.pages.autoindex_page import AutoIndexPage, FileInfo from .types import HTTPResponse, JSONResponse, ResponseStream @@ -242,14 +240,13 @@ async def file( status = 206 else: out_stream = await f.read() - except IsADirectoryError: - if autoindex or index_name: - maybe_response = await AutoIndex( - Path(location), autoindex, index_name - ).handle() - if maybe_response: - return maybe_response - raise + except IsADirectoryError as e: + exc = SanicIsADirectoryError(str(e)) + exc.location = Path(location) + exc.autoindex = autoindex + exc.index_name = index_name + + raise exc mime_type = mime_type or guess_type(filename)[0] or "text/plain" return HTTPResponse( @@ -345,46 +342,3 @@ async def file_stream( headers=headers, content_type=mime_type, ) - - -class AutoIndex: - def __init__( - self, directory: Path, autoindex: bool, index_name: str - ) -> None: - self.directory = directory - self.autoindex = autoindex - self.index_name = index_name - - async def handle(self): - index_file = self.directory / self.index_name - if self.autoindex and (not index_file.exists() or not self.index_name): - return await self.index() - - if self.index_name: - return await file(index_file) - - async def index(self): - page = AutoIndexPage(self._iter_files()) - return html(page.render()) - - def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]: - stat = path.stat() - modified = datetime.fromtimestamp(stat.st_mtime) - is_dir = S_ISDIR(stat.st_mode) - icon = "📁" if is_dir else "📄" - file_name = path.name - if is_dir: - file_name += "/" - return { - "priority": is_dir * -1, - "file_name": file_name, - "icon": icon, - "file_access": modified.isoformat(), - "file_size": stat.st_size, - } - - def _iter_files(self) -> Iterable[FileInfo]: - prepared = [self._prepare_file(f) for f in self.directory.iterdir()] - for item in sorted(prepared, key=itemgetter("priority")): - del item["priority"] - yield cast(FileInfo, item) diff --git a/sanic/worker/container.py b/sanic/worker/container.py new file mode 100644 index 00000000..0d65254b --- /dev/null +++ b/sanic/worker/container.py @@ -0,0 +1,17 @@ +from sanic.worker.loader import AppLoader + + +class AppContainer: + def __init__(self, loader: AppLoader) -> None: + self.loader = loader + + def prepare(self, *apps) -> None: + for app in apps: + app.prepare(**app._early_prepare) + + def serve(self) -> None: + from sanic import Sanic + + primary = self.loader.load() + self.prepare(primary) + Sanic.serve(primary=primary, app_loader=self.loader)