Refactor to allow for common pages

This commit is contained in:
Adam Hopkins 2023-01-26 16:50:45 +02:00
parent 39a4a75dcb
commit 2e36507a60
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
11 changed files with 300 additions and 135 deletions

View File

@ -12,6 +12,7 @@ Setting ``app.config.FALLBACK_ERROR_FORMAT = "auto"`` will enable a switch that
will attempt to provide an appropriate response format based upon the will attempt to provide an appropriate response format based upon the
request type. request type.
""" """
from __future__ import annotations
import sys import sys
import typing as t import typing as t
@ -21,8 +22,7 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES from sanic.helpers import STATUS_CODES
from sanic.request import Request from sanic.response import html, json, text
from sanic.response import HTTPResponse, html, json, text
dumps: t.Callable[..., str] dumps: t.Callable[..., str]
@ -33,6 +33,8 @@ try:
except ImportError: # noqa except ImportError: # noqa
from json import dumps from json import dumps
if t.TYPE_CHECKING:
from sanic import HTTPResponse, Request
DEFAULT_FORMAT = "auto" DEFAULT_FORMAT = "auto"
FALLBACK_TEXT = ( FALLBACK_TEXT = (

View File

@ -1,4 +1,5 @@
from asyncio import CancelledError from asyncio import CancelledError
from pathlib import Path
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
from sanic.helpers import STATUS_CODES from sanic.helpers import STATUS_CODES
@ -269,3 +270,10 @@ class InvalidSignal(SanicException):
class WebsocketClosed(SanicException): class WebsocketClosed(SanicException):
quiet = True quiet = True
message = "Client has closed the websocket connection" message = "Client has closed the websocket connection"
class SanicIsADirectoryError(SanicException):
quiet = True
location: Path
autoindex: bool
index_name: str

View File

@ -0,0 +1,10 @@
from .content_range import ContentRangeHandler
from .directory import DirectoryHandler
from .error import ErrorHandler
__all__ = (
"ContentRangeHandler",
"DirectoryHandler",
"ErrorHandler",
)

View File

@ -0,0 +1,78 @@
from __future__ import annotations
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@ -0,0 +1,68 @@
from __future__ import annotations
from datetime import datetime
from operator import itemgetter
from pathlib import Path
from stat import S_ISDIR
from typing import Any, Coroutine, Dict, Iterable, Optional, Union, cast
from sanic.exceptions import SanicIsADirectoryError
from sanic.pages.autoindex import AutoIndex, FileInfo
from sanic.request import Request
from sanic.response import file, html
from sanic.response.types import HTTPResponse
class DirectoryHandler:
def __init__(
self, directory: Path, autoindex: bool, index_name: str
) -> None:
self.directory = directory
self.autoindex = autoindex
self.index_name = index_name
def handle(self):
index_file = self.directory / self.index_name
if self.autoindex and (not index_file.exists() or not self.index_name):
return self.index()
if self.index_name:
return file(index_file)
def index(self):
page = AutoIndex(self._iter_files())
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
stat = path.stat()
modified = datetime.fromtimestamp(stat.st_mtime)
is_dir = S_ISDIR(stat.st_mode)
icon = "📁" if is_dir else "📄"
file_name = path.name
if is_dir:
file_name += "/"
return {
"priority": is_dir * -1,
"file_name": file_name,
"icon": icon,
"file_access": modified.isoformat(),
"file_size": stat.st_size,
}
def _iter_files(self) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in self.directory.iterdir()]
for item in sorted(prepared, key=itemgetter("priority")):
del item["priority"]
yield cast(FileInfo, item)
@classmethod
def default_handler(
cls, request: Request, exception: SanicIsADirectoryError
) -> Optional[Union[HTTPResponse, Coroutine[Any, Any, HTTPResponse]]]:
if exception.autoindex or exception.index_name:
maybe_response = DirectoryHandler(
exception.location, exception.autoindex, exception.index_name
).handle()
if maybe_response:
return maybe_response
return None

View File

@ -3,11 +3,8 @@ from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Type from typing import Dict, List, Optional, Tuple, Type
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
from sanic.exceptions import ( from sanic.exceptions import SanicIsADirectoryError
HeaderNotFound, from sanic.handlers.directory import DirectoryHandler
InvalidRangeType,
RangeNotSatisfiable,
)
from sanic.log import deprecation, error_logger from sanic.log import deprecation, error_logger
from sanic.models.handler_types import RouteHandler from sanic.models.handler_types import RouteHandler
from sanic.response import text from sanic.response import text
@ -23,16 +20,21 @@ class ErrorHandler:
by the developers to perform a wide range of tasks from recording the error by the developers to perform a wide range of tasks from recording the error
stats to reporting them to an external service that can be used for stats to reporting them to an external service that can be used for
realtime alerting system. realtime alerting system.
""" """
DEFAULT_HANDLERS = {
(SanicIsADirectoryError, None): DirectoryHandler.default_handler
}
def __init__( def __init__(
self, self,
base: Type[BaseRenderer] = TextRenderer, base: Type[BaseRenderer] = TextRenderer,
): ):
self.cached_handlers: Dict[ self.cached_handlers: Dict[
Tuple[Type[BaseException], Optional[str]], Optional[RouteHandler] Tuple[Type[BaseException], Optional[str]], Optional[RouteHandler]
] = {} ] = {
**self.DEFAULT_HANDLERS # type: ignore
}
self.debug = False self.debug = False
self.base = base self.base = base
@ -196,74 +198,3 @@ class ErrorHandler:
error_logger.exception( error_logger.exception(
"Exception occurred while handling uri: %s", url "Exception occurred while handling uri: %s", url
) )
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

0
sanic/pages/__init__.py Normal file
View File

55
sanic/pages/autoindex.py Normal file
View File

@ -0,0 +1,55 @@
from textwrap import dedent
from typing import Iterable, TypedDict
from html5tagger import E
from .base import BasePage
class FileInfo(TypedDict):
icon: str
file_name: str
file_access: str
file_size: str
class AutoIndex(BasePage):
EXTRA_STYLE = dedent(
"""
table.autoindex td:first-child { width: 65% }
table.autoindex td:nth-child(2) { text-align: right; }
table.autoindex td:last-child { width: 215px; text-align: right; }
"""
)
TITLE = "📁 File browser"
def __init__(self, files: Iterable[FileInfo]) -> None:
super().__init__()
self.files = files
def _body(self) -> None:
with self.doc.main:
self._headline()
self._file_table(self.files)
def _headline(self):
self.doc.h1(self.TITLE)
def _file_table(self, files: Iterable[FileInfo]):
with self.doc.table(class_="autoindex"):
self._parent()
for f in files:
self._file_row(**f)
def _parent(self):
self._file_row("📁", "..", "", "")
def _file_row(
self,
icon: str,
file_name: str,
file_access: str,
file_size: str,
):
first = E.span(icon, class_="icon").a(file_name, href=file_name)
self.doc.tr.td(first).td(file_size).td(file_access)

42
sanic/pages/base.py Normal file
View File

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from textwrap import dedent
from html5tagger import Document
class BasePage(ABC):
BASE_STYLE = dedent(
"""
html { font-family: sans-serif; }
main { padding: 1rem; }
table { width: 100%; max-width: 1200px; }
td { font-family: monospace; }
span.icon { margin-right: 1rem; }
@media (prefers-color-scheme: dark) {
html { background: #111; color: #ccc; }
a { color: #ccc; }
a:visited { color: #777; }
}
"""
)
EXTRA_STYLE = ""
TITLE = "Unknown"
def __init__(self) -> None:
self.doc = Document(title=self.TITLE, lang="en")
@property
def style(self) -> str:
return f"{self.BASE_STYLE}\n{self.EXTRA_STYLE}"
def render(self) -> str:
self._head()
self._body()
return str(self.doc)
def _head(self) -> None:
self.doc.head.title(self.TITLE).style(self.style)
@abstractmethod
def _body(self) -> None:
...

View File

@ -3,20 +3,18 @@ from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from email.utils import formatdate, parsedate_to_datetime from email.utils import formatdate, parsedate_to_datetime
from mimetypes import guess_type from mimetypes import guess_type
from operator import itemgetter
from os import path from os import path
from pathlib import Path, PurePath from pathlib import Path, PurePath
from stat import S_ISDIR
from time import time from time import time
from typing import Any, AnyStr, Callable, Dict, Iterable, Optional, Union, cast from typing import Any, AnyStr, Callable, Dict, Optional, Union
from urllib.parse import quote_plus from urllib.parse import quote_plus
from sanic.compat import Header, open_async, stat_async from sanic.compat import Header, open_async, stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import SanicIsADirectoryError
from sanic.helpers import Default, _default from sanic.helpers import Default, _default
from sanic.log import logger from sanic.log import logger
from sanic.models.protocol_types import HTMLProtocol, Range from sanic.models.protocol_types import HTMLProtocol, Range
from sanic.pages.autoindex_page import AutoIndexPage, FileInfo
from .types import HTTPResponse, JSONResponse, ResponseStream from .types import HTTPResponse, JSONResponse, ResponseStream
@ -242,14 +240,13 @@ async def file(
status = 206 status = 206
else: else:
out_stream = await f.read() out_stream = await f.read()
except IsADirectoryError: except IsADirectoryError as e:
if autoindex or index_name: exc = SanicIsADirectoryError(str(e))
maybe_response = await AutoIndex( exc.location = Path(location)
Path(location), autoindex, index_name exc.autoindex = autoindex
).handle() exc.index_name = index_name
if maybe_response:
return maybe_response raise exc
raise
mime_type = mime_type or guess_type(filename)[0] or "text/plain" mime_type = mime_type or guess_type(filename)[0] or "text/plain"
return HTTPResponse( return HTTPResponse(
@ -345,46 +342,3 @@ async def file_stream(
headers=headers, headers=headers,
content_type=mime_type, content_type=mime_type,
) )
class AutoIndex:
def __init__(
self, directory: Path, autoindex: bool, index_name: str
) -> None:
self.directory = directory
self.autoindex = autoindex
self.index_name = index_name
async def handle(self):
index_file = self.directory / self.index_name
if self.autoindex and (not index_file.exists() or not self.index_name):
return await self.index()
if self.index_name:
return await file(index_file)
async def index(self):
page = AutoIndexPage(self._iter_files())
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
stat = path.stat()
modified = datetime.fromtimestamp(stat.st_mtime)
is_dir = S_ISDIR(stat.st_mode)
icon = "📁" if is_dir else "📄"
file_name = path.name
if is_dir:
file_name += "/"
return {
"priority": is_dir * -1,
"file_name": file_name,
"icon": icon,
"file_access": modified.isoformat(),
"file_size": stat.st_size,
}
def _iter_files(self) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in self.directory.iterdir()]
for item in sorted(prepared, key=itemgetter("priority")):
del item["priority"]
yield cast(FileInfo, item)

17
sanic/worker/container.py Normal file
View File

@ -0,0 +1,17 @@
from sanic.worker.loader import AppLoader
class AppContainer:
def __init__(self, loader: AppLoader) -> None:
self.loader = loader
def prepare(self, *apps) -> None:
for app in apps:
app.prepare(**app._early_prepare)
def serve(self) -> None:
from sanic import Sanic
primary = self.loader.load()
self.prepare(primary)
Sanic.serve(primary=primary, app_loader=self.loader)