Compare commits
1 Commits
sml-change
...
22.12LTS
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc82b2334b |
@@ -9,7 +9,6 @@ omit =
|
||||
sanic/simple.py
|
||||
sanic/utils.py
|
||||
sanic/cli
|
||||
sanic/pages
|
||||
|
||||
[html]
|
||||
directory = coverage
|
||||
|
||||
@@ -17,8 +17,7 @@ ignore:
|
||||
- "sanic/compat.py"
|
||||
- "sanic/simple.py"
|
||||
- "sanic/utils.py"
|
||||
- "sanic/cli/"
|
||||
- "sanic/pages/"
|
||||
- "sanic/cli"
|
||||
- ".github/"
|
||||
- "changelogs/"
|
||||
- "docker/"
|
||||
|
||||
@@ -23,6 +23,5 @@ module = [
|
||||
"trustme.*",
|
||||
"sanic_routing.*",
|
||||
"aioquic.*",
|
||||
"html5tagger.*",
|
||||
]
|
||||
ignore_missing_imports = true
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "23.3.0"
|
||||
__version__ = "22.12.0"
|
||||
|
||||
15
sanic/app.py
15
sanic/app.py
@@ -72,7 +72,6 @@ from sanic.log import (
|
||||
from sanic.middleware import Middleware, MiddlewareLocation
|
||||
from sanic.mixins.listeners import ListenerEvent
|
||||
from sanic.mixins.startup import StartupMixin
|
||||
from sanic.mixins.static import StaticHandleMixin
|
||||
from sanic.models.futures import (
|
||||
FutureException,
|
||||
FutureListener,
|
||||
@@ -80,6 +79,7 @@ from sanic.models.futures import (
|
||||
FutureRegistry,
|
||||
FutureRoute,
|
||||
FutureSignal,
|
||||
FutureStatic,
|
||||
)
|
||||
from sanic.models.handler_types import ListenerType, MiddlewareType
|
||||
from sanic.models.handler_types import Sanic as SanicVar
|
||||
@@ -106,7 +106,7 @@ if OS_IS_WINDOWS: # no cov
|
||||
enable_windows_color_support()
|
||||
|
||||
|
||||
class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
"""
|
||||
The main application instance
|
||||
"""
|
||||
@@ -157,7 +157,6 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
"strict_slashes",
|
||||
"websocket_enabled",
|
||||
"websocket_tasks",
|
||||
"wrappers",
|
||||
)
|
||||
|
||||
_app_registry: Dict[str, "Sanic"] = {}
|
||||
@@ -442,6 +441,9 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
|
||||
return routes
|
||||
|
||||
def _apply_static(self, static: FutureStatic) -> Route:
|
||||
return self._register_static(static)
|
||||
|
||||
def _apply_middleware(
|
||||
self,
|
||||
middleware: FutureMiddleware,
|
||||
@@ -876,8 +878,6 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
:param request: HTTP Request object
|
||||
:return: Nothing
|
||||
"""
|
||||
__tracebackhide__ = True
|
||||
|
||||
await self.dispatch(
|
||||
"http.lifecycle.handle",
|
||||
inline=True,
|
||||
@@ -890,7 +890,6 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
Union[
|
||||
BaseHTTPResponse,
|
||||
Coroutine[Any, Any, Optional[BaseHTTPResponse]],
|
||||
ResponseStream,
|
||||
]
|
||||
] = None
|
||||
run_middleware = True
|
||||
@@ -999,7 +998,7 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
...
|
||||
await response.send(end_stream=True)
|
||||
elif isinstance(response, ResponseStream):
|
||||
resp = await response(request)
|
||||
resp = await response(request) # type: ignore
|
||||
await self.dispatch(
|
||||
"http.lifecycle.response",
|
||||
inline=True,
|
||||
@@ -1008,7 +1007,7 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
|
||||
"response": resp,
|
||||
},
|
||||
)
|
||||
await response.eof()
|
||||
await response.eof() # type: ignore
|
||||
else:
|
||||
if not hasattr(handler, "is_websocket"):
|
||||
raise ServerError(
|
||||
|
||||
@@ -40,8 +40,6 @@ FULL_COLOR_LOGO = """
|
||||
|
||||
""" # noqa
|
||||
|
||||
SVG_LOGO_SIMPLE = """<svg id=logo-simple viewBox="0 0 964 279"><desc>Sanic</desc><path d="M107 222c9-2 10-20 1-22s-20-2-30-2-17 7-16 14 6 10 15 10h30zm115-1c16-2 30-11 35-23s6-24 2-33-6-14-15-20-24-11-38-10c-7 3-10 13-5 19s17-1 24 4 15 14 13 24-5 15-14 18-50 0-74 0h-17c-6 4-10 15-4 20s16 2 23 3zM251 83q9-1 9-7 0-15-10-16h-13c-10 6-10 20 0 22zM147 60c-4 0-10 3-11 11s5 13 10 12 42 0 67 0c5-3 7-10 6-15s-4-8-9-8zm-33 1c-8 0-16 0-24 3s-20 10-25 20-6 24-4 36 15 22 26 27 78 8 94 3c4-4 4-12 0-18s-69 8-93-10c-8-7-9-23 0-30s12-10 20-10 12 2 16-3 1-15-5-18z" fill="#ff0d68"/><path d="M676 74c0-14-18-9-20 0s0 30 0 39 20 9 20 2zm-297-10c-12 2-15 12-23 23l-41 58H340l22-30c8-12 23-13 30-4s20 24 24 38-10 10-17 10l-68 2q-17 1-48 30c-7 6-10 20 0 24s15-8 20-13 20 -20 58-21h50 c20 2 33 9 52 30 8 10 24-4 16-13L384 65q-3-2-5-1zm131 0c-10 1-12 12-11 20v96c1 10-3 23 5 32s20-5 17-15c0-23-3-46 2-67 5-12 22-14 32-5l103 87c7 5 19 1 18-9v-64c-3-10-20-9-21 2s-20 22-30 13l-97-80c-5-4-10-10-18-10zM701 76v128c2 10 15 12 20 4s0-102 0-124s-20-18-20-7z M850 63c-35 0-69-2-86 15s-20 60-13 66 13 8 16 0 1-10 1-27 12-26 20-32 66-5 85-5 31 4 31-10-18-7-54-7M764 159c-6-2-15-2-16 12s19 37 33 43 23 8 25-4-4-11-11-14q-9-3-22-18c-4-7-3-16-10-19zM828 196c-4 0-8 1-10 5s-4 12 0 15 8 2 12 2h60c5 0 10-2 12-6 3-7-1-16-8-16z" fill="#1f1f1f"/></svg>""" # noqa
|
||||
|
||||
ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
|
||||
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ from sanic.mixins.listeners import ListenerMixin
|
||||
from sanic.mixins.middleware import MiddlewareMixin
|
||||
from sanic.mixins.routes import RouteMixin
|
||||
from sanic.mixins.signals import SignalMixin
|
||||
from sanic.mixins.static import StaticMixin
|
||||
|
||||
|
||||
VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
|
||||
@@ -17,7 +16,6 @@ VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
|
||||
|
||||
class BaseSanic(
|
||||
RouteMixin,
|
||||
StaticMixin,
|
||||
MiddlewareMixin,
|
||||
ListenerMixin,
|
||||
ExceptionMixin,
|
||||
|
||||
@@ -105,7 +105,6 @@ class Blueprint(BaseSanic):
|
||||
"version",
|
||||
"version_prefix",
|
||||
"websocket_routes",
|
||||
"wrappers",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
@@ -305,6 +304,9 @@ class Blueprint(BaseSanic):
|
||||
|
||||
# Routes
|
||||
for future in self._future_routes:
|
||||
# attach the blueprint name to the handler so that it can be
|
||||
# prefixed properly in the router
|
||||
future.handler.__blueprintname__ = self.name
|
||||
# Prepend the blueprint URI prefix if available
|
||||
uri = self._setup_uri(future.uri, url_prefix)
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ Setting ``app.config.FALLBACK_ERROR_FORMAT = "auto"`` will enable a switch that
|
||||
will attempt to provide an appropriate response format based upon the
|
||||
request type.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import typing as t
|
||||
@@ -22,8 +21,8 @@ from traceback import extract_tb
|
||||
|
||||
from sanic.exceptions import BadRequest, SanicException
|
||||
from sanic.helpers import STATUS_CODES
|
||||
from sanic.pages.error import ErrorPage
|
||||
from sanic.response import html, json, text
|
||||
from sanic.request import Request
|
||||
from sanic.response import HTTPResponse, html, json, text
|
||||
|
||||
|
||||
dumps: t.Callable[..., str]
|
||||
@@ -34,8 +33,6 @@ try:
|
||||
except ImportError: # noqa
|
||||
from json import dumps
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
from sanic import HTTPResponse, Request
|
||||
|
||||
DEFAULT_FORMAT = "auto"
|
||||
FALLBACK_TEXT = (
|
||||
@@ -160,21 +157,36 @@ class HTMLRenderer(BaseRenderer):
|
||||
"{body}"
|
||||
)
|
||||
|
||||
def _page(self, full: bool) -> HTTPResponse:
|
||||
page = ErrorPage(
|
||||
title=super().title,
|
||||
text=super().text,
|
||||
request=self.request,
|
||||
exc=self.exception,
|
||||
full=full,
|
||||
)
|
||||
return html(page.render(), status=self.status, headers=self.headers)
|
||||
|
||||
def full(self) -> HTTPResponse:
|
||||
return self._page(full=True)
|
||||
return html(
|
||||
self.OUTPUT_HTML.format(
|
||||
title=self.title,
|
||||
text=self.text,
|
||||
style=self.TRACEBACK_STYLE,
|
||||
body=self._generate_body(full=True),
|
||||
),
|
||||
status=self.status,
|
||||
)
|
||||
|
||||
def minimal(self) -> HTTPResponse:
|
||||
return self._page(full=False)
|
||||
return html(
|
||||
self.OUTPUT_HTML.format(
|
||||
title=self.title,
|
||||
text=self.text,
|
||||
style=self.TRACEBACK_STYLE,
|
||||
body=self._generate_body(full=False),
|
||||
),
|
||||
status=self.status,
|
||||
headers=self.headers,
|
||||
)
|
||||
|
||||
@property
|
||||
def text(self):
|
||||
return escape(super().text)
|
||||
|
||||
@property
|
||||
def title(self):
|
||||
return escape(f"⚠️ {super().title}")
|
||||
|
||||
def _generate_body(self, *, full):
|
||||
lines = []
|
||||
@@ -392,13 +404,16 @@ CONTENT_TYPE_BY_RENDERERS = {
|
||||
v: k for k, v in RENDERERS_BY_CONTENT_TYPE.items()
|
||||
}
|
||||
|
||||
# Handler source code is checked for which response types it returns with the
|
||||
# route error_format="auto" (default) to determine which format to use.
|
||||
RESPONSE_MAPPING = {
|
||||
"empty": "html",
|
||||
"json": "json",
|
||||
"text": "text",
|
||||
"raw": "text",
|
||||
"html": "html",
|
||||
"JSONResponse": "json",
|
||||
"file": "html",
|
||||
"file_stream": "text",
|
||||
"stream": "text",
|
||||
"redirect": "html",
|
||||
"text/plain": "text",
|
||||
"text/html": "html",
|
||||
"application/json": "json",
|
||||
|
||||
@@ -3,6 +3,11 @@ from __future__ import annotations
|
||||
from typing import Dict, List, Optional, Tuple, Type
|
||||
|
||||
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
|
||||
from sanic.exceptions import (
|
||||
HeaderNotFound,
|
||||
InvalidRangeType,
|
||||
RangeNotSatisfiable,
|
||||
)
|
||||
from sanic.log import deprecation, error_logger
|
||||
from sanic.models.handler_types import RouteHandler
|
||||
from sanic.response import text
|
||||
@@ -18,6 +23,7 @@ class ErrorHandler:
|
||||
by the developers to perform a wide range of tasks from recording the error
|
||||
stats to reporting them to an external service that can be used for
|
||||
realtime alerting system.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -190,3 +196,74 @@ class ErrorHandler:
|
||||
error_logger.exception(
|
||||
"Exception occurred while handling uri: %s", url
|
||||
)
|
||||
|
||||
|
||||
class ContentRangeHandler:
|
||||
"""
|
||||
A mechanism to parse and process the incoming request headers to
|
||||
extract the content range information.
|
||||
|
||||
:param request: Incoming api request
|
||||
:param stats: Stats related to the content
|
||||
|
||||
:type request: :class:`sanic.request.Request`
|
||||
:type stats: :class:`posix.stat_result`
|
||||
|
||||
:ivar start: Content Range start
|
||||
:ivar end: Content Range end
|
||||
:ivar size: Length of the content
|
||||
:ivar total: Total size identified by the :class:`posix.stat_result`
|
||||
instance
|
||||
:ivar ContentRangeHandler.headers: Content range header ``dict``
|
||||
"""
|
||||
|
||||
__slots__ = ("start", "end", "size", "total", "headers")
|
||||
|
||||
def __init__(self, request, stats):
|
||||
self.total = stats.st_size
|
||||
_range = request.headers.getone("range", None)
|
||||
if _range is None:
|
||||
raise HeaderNotFound("Range Header Not Found")
|
||||
unit, _, value = tuple(map(str.strip, _range.partition("=")))
|
||||
if unit != "bytes":
|
||||
raise InvalidRangeType(
|
||||
"%s is not a valid Range Type" % (unit,), self
|
||||
)
|
||||
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
|
||||
try:
|
||||
self.start = int(start_b) if start_b else None
|
||||
except ValueError:
|
||||
raise RangeNotSatisfiable(
|
||||
"'%s' is invalid for Content Range" % (start_b,), self
|
||||
)
|
||||
try:
|
||||
self.end = int(end_b) if end_b else None
|
||||
except ValueError:
|
||||
raise RangeNotSatisfiable(
|
||||
"'%s' is invalid for Content Range" % (end_b,), self
|
||||
)
|
||||
if self.end is None:
|
||||
if self.start is None:
|
||||
raise RangeNotSatisfiable(
|
||||
"Invalid for Content Range parameters", self
|
||||
)
|
||||
else:
|
||||
# this case represents `Content-Range: bytes 5-`
|
||||
self.end = self.total - 1
|
||||
else:
|
||||
if self.start is None:
|
||||
# this case represents `Content-Range: bytes -5`
|
||||
self.start = self.total - self.end
|
||||
self.end = self.total - 1
|
||||
if self.start >= self.end:
|
||||
raise RangeNotSatisfiable(
|
||||
"Invalid for Content Range parameters", self
|
||||
)
|
||||
self.size = self.end - self.start + 1
|
||||
self.headers = {
|
||||
"Content-Range": "bytes %s-%s/%s"
|
||||
% (self.start, self.end, self.total)
|
||||
}
|
||||
|
||||
def __bool__(self):
|
||||
return self.size > 0
|
||||
@@ -1,10 +0,0 @@
|
||||
from .content_range import ContentRangeHandler
|
||||
from .directory import DirectoryHandler
|
||||
from .error import ErrorHandler
|
||||
|
||||
|
||||
__all__ = (
|
||||
"ContentRangeHandler",
|
||||
"DirectoryHandler",
|
||||
"ErrorHandler",
|
||||
)
|
||||
@@ -1,78 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from sanic.exceptions import (
|
||||
HeaderNotFound,
|
||||
InvalidRangeType,
|
||||
RangeNotSatisfiable,
|
||||
)
|
||||
|
||||
|
||||
class ContentRangeHandler:
|
||||
"""
|
||||
A mechanism to parse and process the incoming request headers to
|
||||
extract the content range information.
|
||||
|
||||
:param request: Incoming api request
|
||||
:param stats: Stats related to the content
|
||||
|
||||
:type request: :class:`sanic.request.Request`
|
||||
:type stats: :class:`posix.stat_result`
|
||||
|
||||
:ivar start: Content Range start
|
||||
:ivar end: Content Range end
|
||||
:ivar size: Length of the content
|
||||
:ivar total: Total size identified by the :class:`posix.stat_result`
|
||||
instance
|
||||
:ivar ContentRangeHandler.headers: Content range header ``dict``
|
||||
"""
|
||||
|
||||
__slots__ = ("start", "end", "size", "total", "headers")
|
||||
|
||||
def __init__(self, request, stats):
|
||||
self.total = stats.st_size
|
||||
_range = request.headers.getone("range", None)
|
||||
if _range is None:
|
||||
raise HeaderNotFound("Range Header Not Found")
|
||||
unit, _, value = tuple(map(str.strip, _range.partition("=")))
|
||||
if unit != "bytes":
|
||||
raise InvalidRangeType(
|
||||
"%s is not a valid Range Type" % (unit,), self
|
||||
)
|
||||
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
|
||||
try:
|
||||
self.start = int(start_b) if start_b else None
|
||||
except ValueError:
|
||||
raise RangeNotSatisfiable(
|
||||
"'%s' is invalid for Content Range" % (start_b,), self
|
||||
)
|
||||
try:
|
||||
self.end = int(end_b) if end_b else None
|
||||
except ValueError:
|
||||
raise RangeNotSatisfiable(
|
||||
"'%s' is invalid for Content Range" % (end_b,), self
|
||||
)
|
||||
if self.end is None:
|
||||
if self.start is None:
|
||||
raise RangeNotSatisfiable(
|
||||
"Invalid for Content Range parameters", self
|
||||
)
|
||||
else:
|
||||
# this case represents `Content-Range: bytes 5-`
|
||||
self.end = self.total - 1
|
||||
else:
|
||||
if self.start is None:
|
||||
# this case represents `Content-Range: bytes -5`
|
||||
self.start = self.total - self.end
|
||||
self.end = self.total - 1
|
||||
if self.start >= self.end:
|
||||
raise RangeNotSatisfiable(
|
||||
"Invalid for Content Range parameters", self
|
||||
)
|
||||
self.size = self.end - self.start + 1
|
||||
self.headers = {
|
||||
"Content-Range": "bytes %s-%s/%s"
|
||||
% (self.start, self.end, self.total)
|
||||
}
|
||||
|
||||
def __bool__(self):
|
||||
return self.size > 0
|
||||
@@ -1,84 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from operator import itemgetter
|
||||
from pathlib import Path
|
||||
from stat import S_ISDIR
|
||||
from typing import Dict, Iterable, Optional, Sequence, Union, cast
|
||||
|
||||
from sanic.exceptions import NotFound
|
||||
from sanic.pages.directory_page import DirectoryPage, FileInfo
|
||||
from sanic.request import Request
|
||||
from sanic.response import file, html, redirect
|
||||
|
||||
|
||||
class DirectoryHandler:
|
||||
def __init__(
|
||||
self,
|
||||
uri: str,
|
||||
directory: Path,
|
||||
directory_view: bool = False,
|
||||
index: Optional[Union[str, Sequence[str]]] = None,
|
||||
) -> None:
|
||||
if isinstance(index, str):
|
||||
index = [index]
|
||||
elif index is None:
|
||||
index = []
|
||||
self.base = uri.strip("/")
|
||||
self.directory = directory
|
||||
self.directory_view = directory_view
|
||||
self.index = tuple(index)
|
||||
|
||||
async def handle(self, request: Request, path: str):
|
||||
current = path.strip("/")[len(self.base) :].strip("/") # noqa: E203
|
||||
for file_name in self.index:
|
||||
index_file = self.directory / current / file_name
|
||||
if index_file.is_file():
|
||||
return await file(index_file)
|
||||
|
||||
if self.directory_view:
|
||||
return self._index(
|
||||
self.directory / current, path, request.app.debug
|
||||
)
|
||||
|
||||
if self.index:
|
||||
raise NotFound("File not found")
|
||||
|
||||
raise IsADirectoryError(f"{self.directory.as_posix()} is a directory")
|
||||
|
||||
def _index(self, location: Path, path: str, debug: bool):
|
||||
# Remove empty path elements, append slash
|
||||
if "//" in path or not path.endswith("/"):
|
||||
return redirect(
|
||||
"/" + "".join([f"{p}/" for p in path.split("/") if p])
|
||||
)
|
||||
|
||||
# Render file browser
|
||||
page = DirectoryPage(self._iter_files(location), path, debug)
|
||||
return html(page.render())
|
||||
|
||||
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
|
||||
stat = path.stat()
|
||||
modified = (
|
||||
datetime.fromtimestamp(stat.st_mtime)
|
||||
.isoformat()[:19]
|
||||
.replace("T", " ")
|
||||
)
|
||||
is_dir = S_ISDIR(stat.st_mode)
|
||||
icon = "📁" if is_dir else "📄"
|
||||
file_name = path.name
|
||||
if is_dir:
|
||||
file_name += "/"
|
||||
return {
|
||||
"priority": is_dir * -1,
|
||||
"file_name": file_name,
|
||||
"icon": icon,
|
||||
"file_access": modified,
|
||||
"file_size": stat.st_size,
|
||||
}
|
||||
|
||||
def _iter_files(self, location: Path) -> Iterable[FileInfo]:
|
||||
prepared = [self._prepare_file(f) for f in location.iterdir()]
|
||||
for item in sorted(prepared, key=itemgetter("priority", "file_name")):
|
||||
del item["priority"]
|
||||
yield cast(FileInfo, item)
|
||||
@@ -1,35 +0,0 @@
|
||||
from typing import Optional
|
||||
|
||||
from sanic.base.meta import SanicMeta
|
||||
|
||||
|
||||
class BaseMixin(metaclass=SanicMeta):
|
||||
name: str
|
||||
strict_slashes: Optional[bool]
|
||||
|
||||
def _generate_name(self, *objects) -> str:
|
||||
name = None
|
||||
|
||||
for obj in objects:
|
||||
if obj:
|
||||
if isinstance(obj, str):
|
||||
name = obj
|
||||
break
|
||||
|
||||
try:
|
||||
name = obj.name
|
||||
except AttributeError:
|
||||
try:
|
||||
name = obj.__name__
|
||||
except AttributeError:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if not name: # noqa
|
||||
raise ValueError("Could not generate a name for handler")
|
||||
|
||||
if not name.startswith(f"{self.name}."):
|
||||
name = f"{self.name}.{name}"
|
||||
|
||||
return name
|
||||
@@ -14,7 +14,6 @@ class MiddlewareMixin(metaclass=SanicMeta):
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
self._future_middleware: List[FutureMiddleware] = []
|
||||
self.wrappers = []
|
||||
|
||||
def _apply_middleware(self, middleware: FutureMiddleware):
|
||||
raise NotImplementedError # noqa
|
||||
@@ -141,7 +140,3 @@ class MiddlewareMixin(metaclass=SanicMeta):
|
||||
reverse=True,
|
||||
)[::-1]
|
||||
)
|
||||
|
||||
def wrap(self, handler):
|
||||
self.wrappers.append(handler)
|
||||
return handler
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
from ast import NodeVisitor, Return, parse
|
||||
from contextlib import suppress
|
||||
from email.utils import formatdate
|
||||
from functools import partial, wraps
|
||||
from inspect import getsource, signature
|
||||
from mimetypes import guess_type
|
||||
from os import path
|
||||
from pathlib import Path, PurePath
|
||||
from textwrap import dedent
|
||||
from typing import (
|
||||
Any,
|
||||
@@ -14,15 +19,20 @@ from typing import (
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from urllib.parse import unquote
|
||||
|
||||
from sanic_routing.route import Route
|
||||
|
||||
from sanic.base.meta import SanicMeta
|
||||
from sanic.constants import HTTP_METHODS
|
||||
from sanic.compat import stat_async
|
||||
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, HTTP_METHODS
|
||||
from sanic.errorpages import RESPONSE_MAPPING
|
||||
from sanic.mixins.base import BaseMixin
|
||||
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
|
||||
from sanic.handlers import ContentRangeHandler
|
||||
from sanic.log import error_logger
|
||||
from sanic.models.futures import FutureRoute, FutureStatic
|
||||
from sanic.models.handler_types import RouteHandler
|
||||
from sanic.response import HTTPResponse, file, file_stream, validate_file
|
||||
from sanic.types import HashableDict
|
||||
|
||||
|
||||
@@ -31,14 +41,20 @@ RouteWrapper = Callable[
|
||||
]
|
||||
|
||||
|
||||
class RouteMixin(BaseMixin, metaclass=SanicMeta):
|
||||
class RouteMixin(metaclass=SanicMeta):
|
||||
name: str
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
self._future_routes: Set[FutureRoute] = set()
|
||||
self._future_statics: Set[FutureStatic] = set()
|
||||
self.strict_slashes: Optional[bool] = False
|
||||
|
||||
def _apply_route(self, route: FutureRoute) -> List[Route]:
|
||||
raise NotImplementedError # noqa
|
||||
|
||||
def _apply_static(self, static: FutureStatic) -> Route:
|
||||
raise NotImplementedError # noqa
|
||||
|
||||
def route(
|
||||
self,
|
||||
uri: str,
|
||||
@@ -672,6 +688,324 @@ class RouteMixin(BaseMixin, metaclass=SanicMeta):
|
||||
**ctx_kwargs,
|
||||
)(handler)
|
||||
|
||||
def static(
|
||||
self,
|
||||
uri: str,
|
||||
file_or_directory: Union[str, bytes, PurePath],
|
||||
pattern: str = r"/?.+",
|
||||
use_modified_since: bool = True,
|
||||
use_content_range: bool = False,
|
||||
stream_large_files: bool = False,
|
||||
name: str = "static",
|
||||
host: Optional[str] = None,
|
||||
strict_slashes: Optional[bool] = None,
|
||||
content_type: Optional[bool] = None,
|
||||
apply: bool = True,
|
||||
resource_type: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Register a root to serve files from. The input can either be a
|
||||
file or a directory. This method will enable an easy and simple way
|
||||
to setup the :class:`Route` necessary to serve the static files.
|
||||
|
||||
:param uri: URL path to be used for serving static content
|
||||
:param file_or_directory: Path for the Static file/directory with
|
||||
static files
|
||||
:param pattern: Regex Pattern identifying the valid static files
|
||||
:param use_modified_since: If true, send file modified time, and return
|
||||
not modified if the browser's matches the server's
|
||||
:param use_content_range: If true, process header for range requests
|
||||
and sends the file part that is requested
|
||||
:param stream_large_files: If true, use the
|
||||
:func:`StreamingHTTPResponse.file_stream` handler rather
|
||||
than the :func:`HTTPResponse.file` handler to send the file.
|
||||
If this is an integer, this represents the threshold size to
|
||||
switch to :func:`StreamingHTTPResponse.file_stream`
|
||||
:param name: user defined name used for url_for
|
||||
:param host: Host IP or FQDN for the service to use
|
||||
:param strict_slashes: Instruct :class:`Sanic` to check if the request
|
||||
URLs need to terminate with a */*
|
||||
:param content_type: user defined content type for header
|
||||
:return: routes registered on the router
|
||||
:rtype: List[sanic.router.Route]
|
||||
"""
|
||||
|
||||
name = self._generate_name(name)
|
||||
|
||||
if strict_slashes is None and self.strict_slashes is not None:
|
||||
strict_slashes = self.strict_slashes
|
||||
|
||||
if not isinstance(file_or_directory, (str, bytes, PurePath)):
|
||||
raise ValueError(
|
||||
f"Static route must be a valid path, not {file_or_directory}"
|
||||
)
|
||||
|
||||
static = FutureStatic(
|
||||
uri,
|
||||
file_or_directory,
|
||||
pattern,
|
||||
use_modified_since,
|
||||
use_content_range,
|
||||
stream_large_files,
|
||||
name,
|
||||
host,
|
||||
strict_slashes,
|
||||
content_type,
|
||||
resource_type,
|
||||
)
|
||||
self._future_statics.add(static)
|
||||
|
||||
if apply:
|
||||
self._apply_static(static)
|
||||
|
||||
def _generate_name(self, *objects) -> str:
|
||||
name = None
|
||||
|
||||
for obj in objects:
|
||||
if obj:
|
||||
if isinstance(obj, str):
|
||||
name = obj
|
||||
break
|
||||
|
||||
try:
|
||||
name = obj.name
|
||||
except AttributeError:
|
||||
try:
|
||||
name = obj.__name__
|
||||
except AttributeError:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if not name: # noqa
|
||||
raise ValueError("Could not generate a name for handler")
|
||||
|
||||
if not name.startswith(f"{self.name}."):
|
||||
name = f"{self.name}.{name}"
|
||||
|
||||
return name
|
||||
|
||||
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
|
||||
file_path_raw = Path(unquote(file_or_directory))
|
||||
root_path = file_path = file_path_raw.resolve()
|
||||
|
||||
if __file_uri__:
|
||||
# Strip all / that in the beginning of the URL to help prevent
|
||||
# python from herping a derp and treating the uri as an
|
||||
# absolute path
|
||||
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
|
||||
file_path_raw = Path(file_or_directory, unquoted_file_uri)
|
||||
file_path = file_path_raw.resolve()
|
||||
if (
|
||||
file_path < root_path and not file_path_raw.is_symlink()
|
||||
) or ".." in file_path_raw.parts:
|
||||
error_logger.exception(
|
||||
f"File not found: path={file_or_directory}, "
|
||||
f"relative_url={__file_uri__}"
|
||||
)
|
||||
raise not_found
|
||||
|
||||
try:
|
||||
file_path.relative_to(root_path)
|
||||
except ValueError:
|
||||
if not file_path_raw.is_symlink():
|
||||
error_logger.exception(
|
||||
f"File not found: path={file_or_directory}, "
|
||||
f"relative_url={__file_uri__}"
|
||||
)
|
||||
raise not_found
|
||||
return file_path
|
||||
|
||||
async def _static_request_handler(
|
||||
self,
|
||||
file_or_directory,
|
||||
use_modified_since,
|
||||
use_content_range,
|
||||
stream_large_files,
|
||||
request,
|
||||
content_type=None,
|
||||
__file_uri__=None,
|
||||
):
|
||||
not_found = FileNotFound(
|
||||
"File not found",
|
||||
path=file_or_directory,
|
||||
relative_url=__file_uri__,
|
||||
)
|
||||
|
||||
# Merge served directory and requested file if provided
|
||||
file_path = await self._get_file_path(
|
||||
file_or_directory, __file_uri__, not_found
|
||||
)
|
||||
|
||||
try:
|
||||
headers = {}
|
||||
# Check if the client has been sent this file before
|
||||
# and it has not been modified since
|
||||
stats = None
|
||||
if use_modified_since:
|
||||
stats = await stat_async(file_path)
|
||||
modified_since = stats.st_mtime
|
||||
response = await validate_file(request.headers, modified_since)
|
||||
if response:
|
||||
return response
|
||||
headers["Last-Modified"] = formatdate(
|
||||
modified_since, usegmt=True
|
||||
)
|
||||
_range = None
|
||||
if use_content_range:
|
||||
_range = None
|
||||
if not stats:
|
||||
stats = await stat_async(file_path)
|
||||
headers["Accept-Ranges"] = "bytes"
|
||||
headers["Content-Length"] = str(stats.st_size)
|
||||
if request.method != "HEAD":
|
||||
try:
|
||||
_range = ContentRangeHandler(request, stats)
|
||||
except HeaderNotFound:
|
||||
pass
|
||||
else:
|
||||
del headers["Content-Length"]
|
||||
headers.update(_range.headers)
|
||||
|
||||
if "content-type" not in headers:
|
||||
content_type = (
|
||||
content_type
|
||||
or guess_type(file_path)[0]
|
||||
or DEFAULT_HTTP_CONTENT_TYPE
|
||||
)
|
||||
|
||||
if "charset=" not in content_type and (
|
||||
content_type.startswith("text/")
|
||||
or content_type == "application/javascript"
|
||||
):
|
||||
content_type += "; charset=utf-8"
|
||||
|
||||
headers["Content-Type"] = content_type
|
||||
|
||||
if request.method == "HEAD":
|
||||
return HTTPResponse(headers=headers)
|
||||
else:
|
||||
if stream_large_files:
|
||||
if type(stream_large_files) == int:
|
||||
threshold = stream_large_files
|
||||
else:
|
||||
threshold = 1024 * 1024
|
||||
|
||||
if not stats:
|
||||
stats = await stat_async(file_path)
|
||||
if stats.st_size >= threshold:
|
||||
return await file_stream(
|
||||
file_path, headers=headers, _range=_range
|
||||
)
|
||||
return await file(file_path, headers=headers, _range=_range)
|
||||
except RangeNotSatisfiable:
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
raise not_found
|
||||
except Exception:
|
||||
error_logger.exception(
|
||||
f"Exception in static request handler: "
|
||||
f"path={file_or_directory}, "
|
||||
f"relative_url={__file_uri__}"
|
||||
)
|
||||
raise
|
||||
|
||||
def _register_static(
|
||||
self,
|
||||
static: FutureStatic,
|
||||
):
|
||||
# TODO: Though sanic is not a file server, I feel like we should
|
||||
# at least make a good effort here. Modified-since is nice, but
|
||||
# we could also look into etags, expires, and caching
|
||||
"""
|
||||
Register a static directory handler with Sanic by adding a route to the
|
||||
router and registering a handler.
|
||||
|
||||
:param app: Sanic
|
||||
:param file_or_directory: File or directory path to serve from
|
||||
:type file_or_directory: Union[str,bytes,Path]
|
||||
:param uri: URL to serve from
|
||||
:type uri: str
|
||||
:param pattern: regular expression used to match files in the URL
|
||||
:param use_modified_since: If true, send file modified time, and return
|
||||
not modified if the browser's matches the
|
||||
server's
|
||||
:param use_content_range: If true, process header for range requests
|
||||
and sends the file part that is requested
|
||||
:param stream_large_files: If true, use the file_stream() handler
|
||||
rather than the file() handler to send the file
|
||||
If this is an integer, this represents the
|
||||
threshold size to switch to file_stream()
|
||||
:param name: user defined name used for url_for
|
||||
:type name: str
|
||||
:param content_type: user defined content type for header
|
||||
:return: registered static routes
|
||||
:rtype: List[sanic.router.Route]
|
||||
"""
|
||||
|
||||
if isinstance(static.file_or_directory, bytes):
|
||||
file_or_directory = static.file_or_directory.decode("utf-8")
|
||||
elif isinstance(static.file_or_directory, PurePath):
|
||||
file_or_directory = str(static.file_or_directory)
|
||||
elif not isinstance(static.file_or_directory, str):
|
||||
raise ValueError("Invalid file path string.")
|
||||
else:
|
||||
file_or_directory = static.file_or_directory
|
||||
|
||||
uri = static.uri
|
||||
name = static.name
|
||||
# If we're not trying to match a file directly,
|
||||
# serve from the folder
|
||||
if not static.resource_type:
|
||||
if not path.isfile(file_or_directory):
|
||||
uri = uri.rstrip("/")
|
||||
uri += "/<__file_uri__:path>"
|
||||
elif static.resource_type == "dir":
|
||||
if path.isfile(file_or_directory):
|
||||
raise TypeError(
|
||||
"Resource type improperly identified as directory. "
|
||||
f"'{file_or_directory}'"
|
||||
)
|
||||
uri = uri.rstrip("/")
|
||||
uri += "/<__file_uri__:path>"
|
||||
elif static.resource_type == "file" and not path.isfile(
|
||||
file_or_directory
|
||||
):
|
||||
raise TypeError(
|
||||
"Resource type improperly identified as file. "
|
||||
f"'{file_or_directory}'"
|
||||
)
|
||||
elif static.resource_type != "file":
|
||||
raise ValueError(
|
||||
"The resource_type should be set to 'file' or 'dir'"
|
||||
)
|
||||
|
||||
# special prefix for static files
|
||||
# if not static.name.startswith("_static_"):
|
||||
# name = f"_static_{static.name}"
|
||||
|
||||
_handler = wraps(self._static_request_handler)(
|
||||
partial(
|
||||
self._static_request_handler,
|
||||
file_or_directory,
|
||||
static.use_modified_since,
|
||||
static.use_content_range,
|
||||
static.stream_large_files,
|
||||
content_type=static.content_type,
|
||||
)
|
||||
)
|
||||
|
||||
route, _ = self.route( # type: ignore
|
||||
uri=uri,
|
||||
methods=["GET", "HEAD"],
|
||||
name=name,
|
||||
host=static.host,
|
||||
strict_slashes=static.strict_slashes,
|
||||
static=True,
|
||||
)(_handler)
|
||||
|
||||
return route
|
||||
|
||||
def _determine_error_format(self, handler) -> str:
|
||||
with suppress(OSError, TypeError):
|
||||
src = dedent(getsource(handler))
|
||||
|
||||
@@ -267,11 +267,11 @@ class StartupMixin(metaclass=SanicMeta):
|
||||
if single_process and legacy:
|
||||
raise RuntimeError("Cannot run single process and legacy mode")
|
||||
|
||||
# if register_sys_signals is False and not (single_process or legacy):
|
||||
# raise RuntimeError(
|
||||
# "Cannot run Sanic.serve with register_sys_signals=False. "
|
||||
# "Use either Sanic.serve_single or Sanic.serve_legacy."
|
||||
# )
|
||||
if register_sys_signals is False and not (single_process or legacy):
|
||||
raise RuntimeError(
|
||||
"Cannot run Sanic.serve with register_sys_signals=False. "
|
||||
"Use either Sanic.serve_single or Sanic.serve_legacy."
|
||||
)
|
||||
|
||||
if motd_display:
|
||||
self.config.MOTD_DISPLAY.update(motd_display)
|
||||
|
||||
@@ -1,348 +0,0 @@
|
||||
from email.utils import formatdate
|
||||
from functools import partial, wraps
|
||||
from mimetypes import guess_type
|
||||
from os import PathLike, path
|
||||
from pathlib import Path, PurePath
|
||||
from typing import Optional, Sequence, Set, Union, cast
|
||||
from urllib.parse import unquote
|
||||
|
||||
from sanic_routing.route import Route
|
||||
|
||||
from sanic.base.meta import SanicMeta
|
||||
from sanic.compat import stat_async
|
||||
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
|
||||
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
|
||||
from sanic.handlers import ContentRangeHandler
|
||||
from sanic.handlers.directory import DirectoryHandler
|
||||
from sanic.log import deprecation, error_logger
|
||||
from sanic.mixins.base import BaseMixin
|
||||
from sanic.models.futures import FutureStatic
|
||||
from sanic.request import Request
|
||||
from sanic.response import HTTPResponse, file, file_stream, validate_file
|
||||
|
||||
|
||||
class StaticMixin(BaseMixin, metaclass=SanicMeta):
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
self._future_statics: Set[FutureStatic] = set()
|
||||
|
||||
def _apply_static(self, static: FutureStatic) -> Route:
|
||||
raise NotImplementedError # noqa
|
||||
|
||||
def static(
|
||||
self,
|
||||
uri: str,
|
||||
file_or_directory: Union[PathLike, str, bytes],
|
||||
pattern: str = r"/?.+",
|
||||
use_modified_since: bool = True,
|
||||
use_content_range: bool = False,
|
||||
stream_large_files: Union[bool, int] = False,
|
||||
name: str = "static",
|
||||
host: Optional[str] = None,
|
||||
strict_slashes: Optional[bool] = None,
|
||||
content_type: Optional[str] = None,
|
||||
apply: bool = True,
|
||||
resource_type: Optional[str] = None,
|
||||
index: Optional[Union[str, Sequence[str]]] = None,
|
||||
directory_view: bool = False,
|
||||
directory_handler: Optional[DirectoryHandler] = None,
|
||||
):
|
||||
"""
|
||||
Register a root to serve files from. The input can either be a
|
||||
file or a directory. This method will enable an easy and simple way
|
||||
to setup the :class:`Route` necessary to serve the static files.
|
||||
|
||||
:param uri: URL path to be used for serving static content
|
||||
:param file_or_directory: Path for the Static file/directory with
|
||||
static files
|
||||
:param pattern: Regex Pattern identifying the valid static files
|
||||
:param use_modified_since: If true, send file modified time, and return
|
||||
not modified if the browser's matches the server's
|
||||
:param use_content_range: If true, process header for range requests
|
||||
and sends the file part that is requested
|
||||
:param stream_large_files: If true, use the
|
||||
:func:`StreamingHTTPResponse.file_stream` handler rather
|
||||
than the :func:`HTTPResponse.file` handler to send the file.
|
||||
If this is an integer, this represents the threshold size to
|
||||
switch to :func:`StreamingHTTPResponse.file_stream`
|
||||
:param name: user defined name used for url_for
|
||||
:param host: Host IP or FQDN for the service to use
|
||||
:param strict_slashes: Instruct :class:`Sanic` to check if the request
|
||||
URLs need to terminate with a */*
|
||||
:param content_type: user defined content type for header
|
||||
:param apply: If true, will register the route immediately
|
||||
:param resource_type: Explicitly declare a resource to be a "
|
||||
file" or a "dir"
|
||||
:param index: When exposing against a directory, index is the name that
|
||||
will be served as the default file. When multiple files names are
|
||||
passed, then they will be tried in order.
|
||||
:param directory_view: Whether to fallback to showing the directory
|
||||
viewer when exposing a directory
|
||||
:param directory_handler: An instance of :class:`DirectoryHandler`
|
||||
that can be used for explicitly controlling and subclassing the
|
||||
behavior of the default directory handler
|
||||
:return: routes registered on the router
|
||||
:rtype: List[sanic.router.Route]
|
||||
"""
|
||||
|
||||
name = self._generate_name(name)
|
||||
|
||||
if strict_slashes is None and self.strict_slashes is not None:
|
||||
strict_slashes = self.strict_slashes
|
||||
|
||||
if not isinstance(file_or_directory, (str, bytes, PurePath)):
|
||||
raise ValueError(
|
||||
f"Static route must be a valid path, not {file_or_directory}"
|
||||
)
|
||||
|
||||
if isinstance(file_or_directory, bytes):
|
||||
deprecation(
|
||||
"Serving a static directory with a bytes string is "
|
||||
"deprecated and will be removed in v22.9.",
|
||||
22.9,
|
||||
)
|
||||
file_or_directory = cast(str, file_or_directory.decode())
|
||||
file_or_directory = Path(file_or_directory)
|
||||
|
||||
if directory_handler and (directory_view or index):
|
||||
raise ValueError(
|
||||
"When explicitly setting directory_handler, you cannot "
|
||||
"set either directory_view or index. Instead, pass "
|
||||
"these arguments to your DirectoryHandler instance."
|
||||
)
|
||||
|
||||
if not directory_handler:
|
||||
directory_handler = DirectoryHandler(
|
||||
uri=uri,
|
||||
directory=file_or_directory,
|
||||
directory_view=directory_view,
|
||||
index=index,
|
||||
)
|
||||
|
||||
static = FutureStatic(
|
||||
uri,
|
||||
file_or_directory,
|
||||
pattern,
|
||||
use_modified_since,
|
||||
use_content_range,
|
||||
stream_large_files,
|
||||
name,
|
||||
host,
|
||||
strict_slashes,
|
||||
content_type,
|
||||
resource_type,
|
||||
directory_handler,
|
||||
)
|
||||
self._future_statics.add(static)
|
||||
|
||||
if apply:
|
||||
self._apply_static(static)
|
||||
|
||||
|
||||
class StaticHandleMixin(metaclass=SanicMeta):
|
||||
def _apply_static(self, static: FutureStatic) -> Route:
|
||||
return self._register_static(static)
|
||||
|
||||
def _register_static(
|
||||
self,
|
||||
static: FutureStatic,
|
||||
):
|
||||
# TODO: Though sanic is not a file server, I feel like we should
|
||||
# at least make a good effort here. Modified-since is nice, but
|
||||
# we could also look into etags, expires, and caching
|
||||
"""
|
||||
Register a static directory handler with Sanic by adding a route to the
|
||||
router and registering a handler.
|
||||
"""
|
||||
|
||||
if isinstance(static.file_or_directory, bytes):
|
||||
file_or_directory = static.file_or_directory.decode("utf-8")
|
||||
elif isinstance(static.file_or_directory, PurePath):
|
||||
file_or_directory = str(static.file_or_directory)
|
||||
elif not isinstance(static.file_or_directory, str):
|
||||
raise ValueError("Invalid file path string.")
|
||||
else:
|
||||
file_or_directory = static.file_or_directory
|
||||
|
||||
uri = static.uri
|
||||
name = static.name
|
||||
# If we're not trying to match a file directly,
|
||||
# serve from the folder
|
||||
if not static.resource_type:
|
||||
if not path.isfile(file_or_directory):
|
||||
uri = uri.rstrip("/")
|
||||
uri += "/<__file_uri__:path>"
|
||||
elif static.resource_type == "dir":
|
||||
if path.isfile(file_or_directory):
|
||||
raise TypeError(
|
||||
"Resource type improperly identified as directory. "
|
||||
f"'{file_or_directory}'"
|
||||
)
|
||||
uri = uri.rstrip("/")
|
||||
uri += "/<__file_uri__:path>"
|
||||
elif static.resource_type == "file" and not path.isfile(
|
||||
file_or_directory
|
||||
):
|
||||
raise TypeError(
|
||||
"Resource type improperly identified as file. "
|
||||
f"'{file_or_directory}'"
|
||||
)
|
||||
elif static.resource_type != "file":
|
||||
raise ValueError(
|
||||
"The resource_type should be set to 'file' or 'dir'"
|
||||
)
|
||||
|
||||
# special prefix for static files
|
||||
# if not static.name.startswith("_static_"):
|
||||
# name = f"_static_{static.name}"
|
||||
|
||||
_handler = wraps(self._static_request_handler)(
|
||||
partial(
|
||||
self._static_request_handler,
|
||||
file_or_directory=file_or_directory,
|
||||
use_modified_since=static.use_modified_since,
|
||||
use_content_range=static.use_content_range,
|
||||
stream_large_files=static.stream_large_files,
|
||||
content_type=static.content_type,
|
||||
directory_handler=static.directory_handler,
|
||||
)
|
||||
)
|
||||
|
||||
route, _ = self.route( # type: ignore
|
||||
uri=uri,
|
||||
methods=["GET", "HEAD"],
|
||||
name=name,
|
||||
host=static.host,
|
||||
strict_slashes=static.strict_slashes,
|
||||
static=True,
|
||||
)(_handler)
|
||||
|
||||
return route
|
||||
|
||||
async def _static_request_handler(
|
||||
self,
|
||||
request: Request,
|
||||
*,
|
||||
file_or_directory: PathLike,
|
||||
use_modified_since: bool,
|
||||
use_content_range: bool,
|
||||
stream_large_files: Union[bool, int],
|
||||
directory_handler: DirectoryHandler,
|
||||
content_type: Optional[str] = None,
|
||||
__file_uri__: Optional[str] = None,
|
||||
):
|
||||
not_found = FileNotFound(
|
||||
"File not found",
|
||||
path=file_or_directory,
|
||||
relative_url=__file_uri__,
|
||||
)
|
||||
|
||||
# Merge served directory and requested file if provided
|
||||
file_path = await self._get_file_path(
|
||||
file_or_directory, __file_uri__, not_found
|
||||
)
|
||||
|
||||
try:
|
||||
headers = {}
|
||||
# Check if the client has been sent this file before
|
||||
# and it has not been modified since
|
||||
stats = None
|
||||
if use_modified_since:
|
||||
stats = await stat_async(file_path)
|
||||
modified_since = stats.st_mtime
|
||||
response = await validate_file(request.headers, modified_since)
|
||||
if response:
|
||||
return response
|
||||
headers["Last-Modified"] = formatdate(
|
||||
modified_since, usegmt=True
|
||||
)
|
||||
_range = None
|
||||
if use_content_range:
|
||||
_range = None
|
||||
if not stats:
|
||||
stats = await stat_async(file_path)
|
||||
headers["Accept-Ranges"] = "bytes"
|
||||
headers["Content-Length"] = str(stats.st_size)
|
||||
if request.method != "HEAD":
|
||||
try:
|
||||
_range = ContentRangeHandler(request, stats)
|
||||
except HeaderNotFound:
|
||||
pass
|
||||
else:
|
||||
del headers["Content-Length"]
|
||||
headers.update(_range.headers)
|
||||
|
||||
if "content-type" not in headers:
|
||||
content_type = (
|
||||
content_type
|
||||
or guess_type(file_path)[0]
|
||||
or DEFAULT_HTTP_CONTENT_TYPE
|
||||
)
|
||||
|
||||
if "charset=" not in content_type and (
|
||||
content_type.startswith("text/")
|
||||
or content_type == "application/javascript"
|
||||
):
|
||||
content_type += "; charset=utf-8"
|
||||
|
||||
headers["Content-Type"] = content_type
|
||||
|
||||
if request.method == "HEAD":
|
||||
return HTTPResponse(headers=headers)
|
||||
else:
|
||||
if stream_large_files:
|
||||
if isinstance(stream_large_files, bool):
|
||||
threshold = 1024 * 1024
|
||||
else:
|
||||
threshold = stream_large_files
|
||||
|
||||
if not stats:
|
||||
stats = await stat_async(file_path)
|
||||
if stats.st_size >= threshold:
|
||||
return await file_stream(
|
||||
file_path, headers=headers, _range=_range
|
||||
)
|
||||
return await file(file_path, headers=headers, _range=_range)
|
||||
except (IsADirectoryError, PermissionError):
|
||||
return await directory_handler.handle(request, request.path)
|
||||
except RangeNotSatisfiable:
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
raise not_found
|
||||
except Exception:
|
||||
error_logger.exception(
|
||||
"Exception in static request handler: "
|
||||
f"path={file_or_directory}, "
|
||||
f"relative_url={__file_uri__}"
|
||||
)
|
||||
raise
|
||||
|
||||
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
|
||||
file_path_raw = Path(unquote(file_or_directory))
|
||||
root_path = file_path = file_path_raw.resolve()
|
||||
|
||||
if __file_uri__:
|
||||
# Strip all / that in the beginning of the URL to help prevent
|
||||
# python from herping a derp and treating the uri as an
|
||||
# absolute path
|
||||
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
|
||||
file_path_raw = Path(file_or_directory, unquoted_file_uri)
|
||||
file_path = file_path_raw.resolve()
|
||||
if (
|
||||
file_path < root_path and not file_path_raw.is_symlink()
|
||||
) or ".." in file_path_raw.parts:
|
||||
error_logger.exception(
|
||||
f"File not found: path={file_or_directory}, "
|
||||
f"relative_url={__file_uri__}"
|
||||
)
|
||||
raise not_found
|
||||
|
||||
try:
|
||||
file_path.relative_to(root_path)
|
||||
except ValueError:
|
||||
if not file_path_raw.is_symlink():
|
||||
error_logger.exception(
|
||||
f"File not found: path={file_or_directory}, "
|
||||
f"relative_url={__file_uri__}"
|
||||
)
|
||||
raise not_found
|
||||
return file_path
|
||||
@@ -1,7 +1,6 @@
|
||||
from pathlib import Path
|
||||
from pathlib import PurePath
|
||||
from typing import Dict, Iterable, List, NamedTuple, Optional, Union
|
||||
|
||||
from sanic.handlers.directory import DirectoryHandler
|
||||
from sanic.models.handler_types import (
|
||||
ErrorMiddlewareType,
|
||||
ListenerType,
|
||||
@@ -47,17 +46,16 @@ class FutureException(NamedTuple):
|
||||
|
||||
class FutureStatic(NamedTuple):
|
||||
uri: str
|
||||
file_or_directory: Path
|
||||
file_or_directory: Union[str, bytes, PurePath]
|
||||
pattern: str
|
||||
use_modified_since: bool
|
||||
use_content_range: bool
|
||||
stream_large_files: Union[bool, int]
|
||||
stream_large_files: bool
|
||||
name: str
|
||||
host: Optional[str]
|
||||
strict_slashes: Optional[bool]
|
||||
content_type: Optional[str]
|
||||
content_type: Optional[bool]
|
||||
resource_type: Optional[str]
|
||||
directory_handler: DirectoryHandler
|
||||
|
||||
|
||||
class FutureSignal(NamedTuple):
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from html5tagger import HTML, Document
|
||||
|
||||
from sanic import __version__ as VERSION
|
||||
from sanic.application.logo import SVG_LOGO_SIMPLE
|
||||
from sanic.pages.css import CSS
|
||||
|
||||
|
||||
class BasePage(ABC, metaclass=CSS): # no cov
|
||||
TITLE = "Sanic"
|
||||
CSS: str
|
||||
|
||||
def __init__(self, debug: bool = True) -> None:
|
||||
self.doc = None
|
||||
self.debug = debug
|
||||
|
||||
@property
|
||||
def style(self) -> str:
|
||||
return self.CSS
|
||||
|
||||
def render(self) -> str:
|
||||
self.doc = Document(self.TITLE, lang="en")
|
||||
self._head()
|
||||
self._body()
|
||||
self._foot()
|
||||
return str(self.doc)
|
||||
|
||||
def _head(self) -> None:
|
||||
self.doc.style(HTML(self.style))
|
||||
with self.doc.header:
|
||||
self.doc.div(self.TITLE)
|
||||
|
||||
def _foot(self) -> None:
|
||||
with self.doc.footer:
|
||||
self.doc.div("powered by")
|
||||
with self.doc.div:
|
||||
self._sanic_logo()
|
||||
if self.debug:
|
||||
self.doc.div(f"Version {VERSION}")
|
||||
|
||||
@abstractmethod
|
||||
def _body(self) -> None:
|
||||
...
|
||||
|
||||
def _sanic_logo(self) -> None:
|
||||
self.doc.a(
|
||||
HTML(SVG_LOGO_SIMPLE),
|
||||
href="https://sanic.dev",
|
||||
target="_blank",
|
||||
referrerpolicy="no-referrer",
|
||||
)
|
||||
@@ -1,35 +0,0 @@
|
||||
from abc import ABCMeta
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
CURRENT_DIR = Path(__file__).parent
|
||||
|
||||
|
||||
def _extract_style(maybe_style: Optional[str], name: str) -> str:
|
||||
if maybe_style is not None:
|
||||
maybe_path = Path(maybe_style)
|
||||
if maybe_path.exists():
|
||||
return maybe_path.read_text(encoding="UTF-8")
|
||||
return maybe_style
|
||||
maybe_path = CURRENT_DIR / "styles" / f"{name}.css"
|
||||
if maybe_path.exists():
|
||||
return maybe_path.read_text(encoding="UTF-8")
|
||||
return ""
|
||||
|
||||
|
||||
class CSS(ABCMeta):
|
||||
"""Cascade stylesheets, i.e. combine all ancestor styles"""
|
||||
|
||||
def __new__(cls, name, bases, attrs):
|
||||
Page = super().__new__(cls, name, bases, attrs)
|
||||
# Use a locally defined STYLE or the one from styles directory
|
||||
Page.STYLE = _extract_style(attrs.get("STYLE_FILE"), name)
|
||||
Page.STYLE += attrs.get("STYLE_APPEND", "")
|
||||
# Combine with all ancestor styles
|
||||
Page.CSS = "".join(
|
||||
Class.STYLE
|
||||
for Class in reversed(Page.__mro__)
|
||||
if type(Class) is CSS
|
||||
)
|
||||
return Page
|
||||
@@ -1,66 +0,0 @@
|
||||
import sys
|
||||
|
||||
from typing import Dict, Iterable
|
||||
|
||||
from html5tagger import E
|
||||
|
||||
from .base import BasePage
|
||||
|
||||
|
||||
if sys.version_info < (3, 8): # no cov
|
||||
FileInfo = Dict
|
||||
|
||||
else:
|
||||
from typing import TypedDict
|
||||
|
||||
class FileInfo(TypedDict):
|
||||
icon: str
|
||||
file_name: str
|
||||
file_access: str
|
||||
file_size: str
|
||||
|
||||
|
||||
class DirectoryPage(BasePage): # no cov
|
||||
TITLE = "Directory Viewer"
|
||||
|
||||
def __init__(
|
||||
self, files: Iterable[FileInfo], url: str, debug: bool
|
||||
) -> None:
|
||||
super().__init__(debug)
|
||||
self.files = files
|
||||
self.url = url
|
||||
|
||||
def _body(self) -> None:
|
||||
with self.doc.main:
|
||||
self._headline()
|
||||
files = list(self.files)
|
||||
if files:
|
||||
self._file_table(files)
|
||||
else:
|
||||
self.doc.p("The folder is empty.")
|
||||
|
||||
def _headline(self):
|
||||
"""Implement a heading with the current path, combined with
|
||||
breadcrumb links"""
|
||||
with self.doc.h1(id="breadcrumbs"):
|
||||
p = self.url.split("/")[:-1]
|
||||
|
||||
for i, part in enumerate(p):
|
||||
path = "/".join(p[: i + 1]) + "/"
|
||||
with self.doc.a(href=path):
|
||||
self.doc.span(part, class_="dir").span("/", class_="sep")
|
||||
|
||||
def _file_table(self, files: Iterable[FileInfo]):
|
||||
with self.doc.table(class_="autoindex container"):
|
||||
for f in files:
|
||||
self._file_row(**f)
|
||||
|
||||
def _file_row(
|
||||
self,
|
||||
icon: str,
|
||||
file_name: str,
|
||||
file_access: str,
|
||||
file_size: str,
|
||||
):
|
||||
first = E.span(icon, class_="icon").a(file_name, href=file_name)
|
||||
self.doc.tr.td(first).td(file_size).td(file_access)
|
||||
@@ -1,105 +0,0 @@
|
||||
from typing import Any, Mapping
|
||||
|
||||
import tracerite.html
|
||||
|
||||
from html5tagger import E
|
||||
from tracerite import html_traceback, inspector
|
||||
|
||||
from sanic.request import Request
|
||||
|
||||
from .base import BasePage
|
||||
|
||||
|
||||
# Avoid showing the request in the traceback variable inspectors
|
||||
inspector.blacklist_types += (Request,)
|
||||
|
||||
ENDUSER_TEXT = """We're sorry, but it looks like something went wrong. Please try refreshing the page or navigating back to the homepage. If the issue persists, our technical team is working to resolve it as soon as possible. We apologize for the inconvenience and appreciate your patience.""" # noqa: E501
|
||||
|
||||
|
||||
class ErrorPage(BasePage):
|
||||
STYLE_APPEND = tracerite.html.style
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
title: str,
|
||||
text: str,
|
||||
request: Request,
|
||||
exc: Exception,
|
||||
full: bool,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
# Internal server errors come with the text of the exception,
|
||||
# which we don't want to show to the user.
|
||||
# FIXME: Needs to be done in a better way, elsewhere
|
||||
if "Internal Server Error" in title:
|
||||
text = "The application encountered an unexpected error and could not continue." # noqa: E501
|
||||
name = request.app.name.replace("_", " ").strip()
|
||||
if name.islower():
|
||||
name = name.title()
|
||||
self.TITLE = E("Application ").strong(name)(
|
||||
" cannot handle your request"
|
||||
)
|
||||
self.title = title
|
||||
self.text = text
|
||||
self.request = request
|
||||
self.exc = exc
|
||||
self.full = full
|
||||
|
||||
def _head(self) -> None:
|
||||
self.doc._script(tracerite.html.javascript)
|
||||
super()._head()
|
||||
|
||||
def _body(self) -> None:
|
||||
debug = self.request.app.debug
|
||||
try:
|
||||
route_name = self.request.route.name
|
||||
except AttributeError:
|
||||
route_name = "[route not found]"
|
||||
with self.doc.main:
|
||||
self.doc.h1(f"⚠️ {self.title}").p(self.text)
|
||||
# Show context details if available on the exception
|
||||
context = getattr(self.exc, "context", None)
|
||||
if context:
|
||||
self._key_value_table(
|
||||
"Issue context", "exception-context", context
|
||||
)
|
||||
|
||||
if not debug:
|
||||
with self.doc.div(id="enduser"):
|
||||
self.doc.p(ENDUSER_TEXT).p.a("Front Page", href="/")
|
||||
return
|
||||
# Show additional details in debug mode,
|
||||
# open by default for 500 errors
|
||||
with self.doc.details(open=self.full, class_="smalltext"):
|
||||
# Show extra details if available on the exception
|
||||
extra = getattr(self.exc, "extra", None)
|
||||
if extra:
|
||||
self._key_value_table(
|
||||
"Issue extra data", "exception-extra", extra
|
||||
)
|
||||
|
||||
self.doc.summary(
|
||||
"Details for developers (Sanic debug mode only)"
|
||||
)
|
||||
if self.exc:
|
||||
self.doc.h2(f"Exception in {route_name}:")
|
||||
self.doc(html_traceback(self.exc, include_js_css=False))
|
||||
|
||||
self._key_value_table(
|
||||
f"{self.request.method} {self.request.path}",
|
||||
"request-headers",
|
||||
self.request.headers,
|
||||
)
|
||||
|
||||
def _key_value_table(
|
||||
self, title: str, table_id: str, data: Mapping[str, Any]
|
||||
) -> None:
|
||||
self.doc.h2(title)
|
||||
with self.doc.dl(id=table_id, class_="key-value-table smalltext"):
|
||||
for key, value in data.items():
|
||||
# Reading values may cause a new exception, so suppress it
|
||||
try:
|
||||
value = str(value)
|
||||
except Exception:
|
||||
value = E.em("Unable to display value")
|
||||
self.doc.dt.span(key, class_="nobr key").span(": ").dd(value)
|
||||
@@ -1,128 +0,0 @@
|
||||
/** BasePage **/
|
||||
|
||||
:root {
|
||||
--sanic: #ff0d68;
|
||||
--sanic-blue: #0092FF;
|
||||
--sanic-yellow: #FFE900;
|
||||
--sanic-purple: #833FE3;
|
||||
--sanic-green: #37ae6f;
|
||||
--sanic-background: #f1f5f9;
|
||||
--sanic-text: #1f2937;
|
||||
--sanic-tab-background: #fff;
|
||||
--sanic-tab-text: #0f172a;
|
||||
--sanic-tab-shadow: #adadad;
|
||||
--sanic-highlight-background: var(--sanic-yellow);
|
||||
--sanic-highlight-text: var(--sanic-text);
|
||||
--sanic-header-background: #000;
|
||||
}
|
||||
|
||||
@media (prefers-color-scheme: dark) {
|
||||
:root {
|
||||
--sanic-purple: #D246DE;
|
||||
--sanic-green: #16DB93;
|
||||
--sanic-background: #111;
|
||||
--sanic-text: #e7e7e7;
|
||||
--sanic-tab-background: #484848;
|
||||
--sanic-tab-text: #e1e1e1;
|
||||
--sanic-tab-shadow: #000;
|
||||
--sanic-highlight-background: var(--sanic-yellow);
|
||||
--sanic-highlight-text: #000;
|
||||
--sanic-header-background: #000;
|
||||
}
|
||||
}
|
||||
|
||||
html {
|
||||
font: 16px sans-serif;
|
||||
background: var(--sanic-background);
|
||||
color: var(--sanic-text);
|
||||
scrollbar-gutter: stable;
|
||||
overflow: hidden auto;
|
||||
}
|
||||
|
||||
body {
|
||||
margin: 0;
|
||||
font-size: 1.25rem;
|
||||
line-height: 125%;
|
||||
}
|
||||
|
||||
body>* {
|
||||
padding: 1rem 2vw;
|
||||
}
|
||||
|
||||
@media (max-width: 1000px) {
|
||||
body>* {
|
||||
padding: 0.5rem 1.5vw;
|
||||
}
|
||||
|
||||
html {
|
||||
/* Scale everything by rem of 6px-16px by viewport width */
|
||||
font-size: calc(6px + 10 * 100vw / 1000);
|
||||
}
|
||||
}
|
||||
|
||||
main {
|
||||
min-height: 70vh;
|
||||
/* Make sure the footer is closer to bottom */
|
||||
padding: 1rem 2.5rem;
|
||||
/* Generous padding for readability */
|
||||
}
|
||||
|
||||
.smalltext {
|
||||
font-size: 1.0rem;
|
||||
}
|
||||
|
||||
.container {
|
||||
min-width: 600px;
|
||||
max-width: 1600px;
|
||||
}
|
||||
|
||||
header {
|
||||
background: #111;
|
||||
color: #e1e1e1;
|
||||
border-bottom: 1px solid #272727;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
footer {
|
||||
text-align: center;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
font-size: 0.8rem;
|
||||
margin-top: 2rem;
|
||||
}
|
||||
|
||||
h1 {
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
a:visited {
|
||||
color: inherit;
|
||||
}
|
||||
|
||||
a {
|
||||
text-decoration: none;
|
||||
color: #88f;
|
||||
}
|
||||
|
||||
a:hover,
|
||||
a:focus {
|
||||
text-decoration: underline;
|
||||
outline: none;
|
||||
}
|
||||
|
||||
|
||||
span.icon {
|
||||
margin-right: 1rem;
|
||||
}
|
||||
|
||||
#logo-simple {
|
||||
height: 1.75rem;
|
||||
padding: 0 0.25rem;
|
||||
}
|
||||
|
||||
|
||||
@media (prefers-color-scheme: dark) {
|
||||
#logo-simple path:last-child {
|
||||
fill: #e1e1e1;
|
||||
}
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
/** DirectoryPage **/
|
||||
#breadcrumbs>a:hover {
|
||||
text-decoration: none;
|
||||
}
|
||||
|
||||
#breadcrumbs>a .dir {
|
||||
padding: 0 0.25em;
|
||||
}
|
||||
|
||||
#breadcrumbs>a:first-child:hover::before,
|
||||
#breadcrumbs>a .dir:hover {
|
||||
text-decoration: underline;
|
||||
}
|
||||
|
||||
#breadcrumbs>a:first-child::before {
|
||||
content: "🏠";
|
||||
}
|
||||
|
||||
#breadcrumbs>a:last-child {
|
||||
color: #ff0d68;
|
||||
}
|
||||
|
||||
main a {
|
||||
color: inherit;
|
||||
font-weight: bold;
|
||||
}
|
||||
|
||||
table.autoindex {
|
||||
width: 100%;
|
||||
font-family: monospace;
|
||||
font-size: 1.25rem;
|
||||
}
|
||||
|
||||
table.autoindex tr {
|
||||
display: flex;
|
||||
}
|
||||
|
||||
table.autoindex tr:hover {
|
||||
background-color: #ddd;
|
||||
}
|
||||
|
||||
table.autoindex td {
|
||||
margin: 0 0.5rem;
|
||||
}
|
||||
|
||||
table.autoindex td:first-child {
|
||||
flex: 1;
|
||||
}
|
||||
|
||||
table.autoindex td:nth-child(2) {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
table.autoindex td:last-child {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
|
||||
@media (prefers-color-scheme: dark) {
|
||||
table.autoindex tr:hover {
|
||||
background-color: #222;
|
||||
}
|
||||
}
|
||||
@@ -1,105 +0,0 @@
|
||||
/** ErrorPage **/
|
||||
#enduser {
|
||||
max-width: 30em;
|
||||
margin: 5em auto 5em auto;
|
||||
text-align: justify;
|
||||
/*text-justify: both;*/
|
||||
}
|
||||
|
||||
#enduser a {
|
||||
color: var(--sanic-blue);
|
||||
}
|
||||
|
||||
#enduser p:last-child {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
summary {
|
||||
margin-top: 3em;
|
||||
color: var(--sanic-blue);
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
.tracerite {
|
||||
--tracerite-var: var(--sanic-blue);
|
||||
--tracerite-val: var(--sanic-text);
|
||||
--tracerite-type: var(--sanic-green);
|
||||
--tracerite-exception: var(--sanic);
|
||||
--tracerite-highlight: var(--sanic-yellow);
|
||||
--tracerite-tab: var(--sanic-tab-background);
|
||||
--tracerite-tab-text: var(--sanic-tab-text);
|
||||
}
|
||||
|
||||
.tracerite>h3 {
|
||||
margin: 0.5rem 0 !important;
|
||||
}
|
||||
|
||||
.tracerite .traceback-labels button {
|
||||
font-size: 0.8rem !important;
|
||||
line-height: 120% !important;
|
||||
background: var(--tracerite-tab) !important;
|
||||
color: var(--tracerite-tab-text) !important;
|
||||
transition: 0.3s;
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
.tracerite .traceback-labels {
|
||||
padding-top: 5px;
|
||||
}
|
||||
|
||||
.tracerite .traceback-labels button:hover {
|
||||
filter: contrast(150%) brightness(120%) drop-shadow(0 -0 2px var(--sanic-tab-shadow));
|
||||
}
|
||||
|
||||
.tracerite .traceback-details mark span {
|
||||
background: var(--sanic-highlight-background) !important;
|
||||
color: var(--sanic-highlight-text) !important;
|
||||
}
|
||||
|
||||
header {
|
||||
background: var(--sanic-header-background);
|
||||
}
|
||||
|
||||
h1 {
|
||||
/*margin-left: -1.5rem; !* Emoji partially in the left margin *!*/
|
||||
}
|
||||
|
||||
h2 {
|
||||
margin: 1em 0 0.2em 0;
|
||||
font-size: 1.25rem;
|
||||
color: var(--sanic-text);
|
||||
}
|
||||
|
||||
dl.key-value-table {
|
||||
width: 100%;
|
||||
margin: 0;
|
||||
display: grid;
|
||||
grid-template-columns: 1fr 5fr;
|
||||
grid-gap: .3em;
|
||||
white-space: pre-wrap;
|
||||
}
|
||||
|
||||
dl.key-value-table * {
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
dl.key-value-table dt {
|
||||
color: #888;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
dl.key-value-table dd {
|
||||
word-break: break-all;
|
||||
/* Better breaking for cookies header and such */
|
||||
}
|
||||
|
||||
.tracerite .codeline {
|
||||
font-family:
|
||||
"Fira Code",
|
||||
"Source Code Pro",
|
||||
Menlo,
|
||||
Monaco,
|
||||
Consolas,
|
||||
Lucida Console,
|
||||
monospace;
|
||||
}
|
||||
@@ -232,7 +232,7 @@ class JSONResponse(HTTPResponse):
|
||||
body: Optional[Any] = None,
|
||||
status: int = 200,
|
||||
headers: Optional[Union[Header, Dict[str, str]]] = None,
|
||||
content_type: Optional[str] = None,
|
||||
content_type: str = "application/json",
|
||||
dumps: Optional[Callable[..., str]] = None,
|
||||
**kwargs: Any,
|
||||
):
|
||||
|
||||
@@ -39,13 +39,13 @@ class Router(BaseRouter):
|
||||
extra={"host": host} if host else None,
|
||||
)
|
||||
except RoutingNotFound as e:
|
||||
raise NotFound(f"Requested URL {e.path} not found") from None
|
||||
raise NotFound("Requested URL {} not found".format(e.path))
|
||||
except NoMethod as e:
|
||||
raise MethodNotAllowed(
|
||||
f"Method {method} not allowed for URL {path}",
|
||||
"Method {} not allowed for URL {}".format(method, path),
|
||||
method=method,
|
||||
allowed_methods=e.allowed_methods,
|
||||
) from None
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
|
||||
def get( # type: ignore
|
||||
@@ -61,7 +61,6 @@ class Router(BaseRouter):
|
||||
correct response
|
||||
:rtype: Tuple[ Route, RouteHandler, Dict[str, Any]]
|
||||
"""
|
||||
__tracebackhide__ = True
|
||||
return self._get(path, method, host)
|
||||
|
||||
def add( # type: ignore
|
||||
|
||||
@@ -130,14 +130,13 @@ def _setup_system_signals(
|
||||
register_sys_signals: bool,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> None: # no cov
|
||||
print(">>>>>>>>>>>>>>>>>.", run_multiple)
|
||||
# Ignore SIGINT when run_multiple
|
||||
if run_multiple:
|
||||
signal_func(SIGINT, SIG_IGN)
|
||||
os.environ["SANIC_WORKER_PROCESS"] = "true"
|
||||
|
||||
# Register signals for graceful termination
|
||||
if register_sys_signals and False:
|
||||
if register_sys_signals:
|
||||
if OS_IS_WINDOWS:
|
||||
ctrlc_workaround_for_windows(app)
|
||||
else:
|
||||
|
||||
@@ -45,7 +45,7 @@ class WebSocketConnection:
|
||||
|
||||
await self._send(message)
|
||||
|
||||
async def recv(self, *args, **kwargs) -> Optional[Union[str, bytes]]:
|
||||
async def recv(self, *args, **kwargs) -> Optional[str]:
|
||||
message = await self._receive()
|
||||
|
||||
if message["type"] == "websocket.receive":
|
||||
@@ -53,7 +53,7 @@ class WebSocketConnection:
|
||||
return message["text"]
|
||||
except KeyError:
|
||||
try:
|
||||
return message["bytes"]
|
||||
return message["bytes"].decode()
|
||||
except KeyError:
|
||||
raise InvalidUsage("Bad ASGI message received")
|
||||
elif message["type"] == "websocket.disconnect":
|
||||
|
||||
@@ -2,6 +2,7 @@ from pathlib import Path
|
||||
|
||||
from sanic import Sanic
|
||||
from sanic.exceptions import SanicException
|
||||
from sanic.response import redirect
|
||||
|
||||
|
||||
def create_simple_server(directory: Path):
|
||||
@@ -11,8 +12,10 @@ def create_simple_server(directory: Path):
|
||||
)
|
||||
|
||||
app = Sanic("SimpleServer")
|
||||
app.static(
|
||||
"/", directory, name="main", directory_view=True, index="index.html"
|
||||
)
|
||||
app.static("/", directory, name="main")
|
||||
|
||||
@app.get("/")
|
||||
def index(_):
|
||||
return redirect(app.url_for("main", filename="index.html"))
|
||||
|
||||
return app
|
||||
|
||||
6
setup.py
6
setup.py
@@ -35,7 +35,6 @@ def open_local(paths, mode="r", encoding="utf8"):
|
||||
|
||||
return codecs.open(path, mode, encoding)
|
||||
|
||||
|
||||
def str_to_bool(val: str) -> bool:
|
||||
val = val.lower()
|
||||
if val in {
|
||||
@@ -56,7 +55,6 @@ def str_to_bool(val: str) -> bool:
|
||||
else:
|
||||
raise ValueError(f"Invalid truth value {val}")
|
||||
|
||||
|
||||
with open_local(["sanic", "__version__.py"], encoding="latin1") as fp:
|
||||
try:
|
||||
version = re.findall(
|
||||
@@ -81,7 +79,7 @@ setup_kwargs = {
|
||||
),
|
||||
"long_description": long_description,
|
||||
"packages": find_packages(exclude=("tests", "tests.*")),
|
||||
"package_data": {"sanic": ["py.typed", "pages/styles/*"]},
|
||||
"package_data": {"sanic": ["py.typed"]},
|
||||
"platforms": "any",
|
||||
"python_requires": ">=3.7",
|
||||
"classifiers": [
|
||||
@@ -111,8 +109,6 @@ requirements = [
|
||||
"aiofiles>=0.6.0",
|
||||
"websockets>=10.0",
|
||||
"multidict>=5.0,<7.0",
|
||||
"html5tagger>=1.2.1",
|
||||
"tracerite>=1.0.0",
|
||||
]
|
||||
|
||||
tests_require = [
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
@@ -60,6 +58,7 @@ CACHE: Dict[str, Any] = {}
|
||||
|
||||
|
||||
class RouteStringGenerator:
|
||||
|
||||
ROUTE_COUNT_PER_DEPTH = 100
|
||||
HTTP_METHODS = HTTP_METHODS
|
||||
ROUTE_PARAM_TYPES = ["str", "int", "float", "alpha", "uuid"]
|
||||
@@ -233,12 +232,3 @@ def urlopen():
|
||||
urlopen.read = Mock()
|
||||
with patch("sanic.cli.inspector_client.urlopen", urlopen):
|
||||
yield urlopen
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def static_file_directory():
|
||||
"""The static directory to serve"""
|
||||
current_file = inspect.getfile(inspect.currentframe())
|
||||
current_directory = os.path.dirname(os.path.abspath(current_file))
|
||||
static_directory = os.path.join(current_directory, "static")
|
||||
return static_directory
|
||||
|
||||
@@ -36,7 +36,6 @@ def test_app_loop_running(app: Sanic):
|
||||
assert response.text == "pass"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_create_asyncio_server(app: Sanic):
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
|
||||
@@ -45,7 +44,6 @@ def test_create_asyncio_server(app: Sanic):
|
||||
assert srv.is_serving() is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_asyncio_server_no_start_serving(app: Sanic):
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio_srv_coro = app.create_server(
|
||||
@@ -57,7 +55,6 @@ def test_asyncio_server_no_start_serving(app: Sanic):
|
||||
assert srv.is_serving() is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_asyncio_server_start_serving(app: Sanic):
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio_srv_coro = app.create_server(
|
||||
@@ -75,7 +72,6 @@ def test_asyncio_server_start_serving(app: Sanic):
|
||||
# Looks like we can't easily test `serve_forever()`
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_create_server_main(app: Sanic, caplog):
|
||||
app.listener("main_process_start")(lambda *_: ...)
|
||||
loop = asyncio.get_event_loop()
|
||||
@@ -90,7 +86,6 @@ def test_create_server_main(app: Sanic, caplog):
|
||||
) in caplog.record_tuples
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_create_server_no_startup(app: Sanic):
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio_srv_coro = app.create_server(
|
||||
@@ -106,7 +101,6 @@ def test_create_server_no_startup(app: Sanic):
|
||||
loop.run_until_complete(srv.start_serving())
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_create_server_main_convenience(app: Sanic, caplog):
|
||||
app.main_process_start(lambda *_: ...)
|
||||
loop = asyncio.get_event_loop()
|
||||
@@ -132,6 +126,7 @@ def test_app_loop_not_running(app: Sanic):
|
||||
|
||||
|
||||
def test_app_run_raise_type_error(app: Sanic):
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
app.run(loop="loop")
|
||||
|
||||
@@ -144,6 +139,7 @@ def test_app_run_raise_type_error(app: Sanic):
|
||||
|
||||
|
||||
def test_app_route_raise_value_error(app: Sanic):
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
|
||||
@app.route("/test")
|
||||
@@ -225,6 +221,7 @@ def test_app_websocket_parameters(websocket_protocol_mock, app: Sanic):
|
||||
|
||||
|
||||
def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
|
||||
|
||||
err_msg = "Mock Exception"
|
||||
|
||||
def mock_error_handler_response(*args, **kwargs):
|
||||
@@ -244,6 +241,7 @@ def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
|
||||
|
||||
|
||||
def test_handle_request_with_nested_exception_debug(app: Sanic, monkeypatch):
|
||||
|
||||
err_msg = "Mock Exception"
|
||||
|
||||
def mock_error_handler_response(*args, **kwargs):
|
||||
@@ -472,7 +470,6 @@ def test_uvloop_config(app: Sanic, monkeypatch, use):
|
||||
try_use_uvloop.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
|
||||
apps = (Sanic("default-uvloop"), Sanic("no-uvloop"), Sanic("yes-uvloop"))
|
||||
|
||||
@@ -509,7 +506,6 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
|
||||
assert counter[(logging.WARNING, message)] == modified
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
def test_multiple_uvloop_configs_display_warning(caplog):
|
||||
Sanic._uvloop_setting = None # Reset the setting (changed in prev tests)
|
||||
|
||||
|
||||
@@ -342,7 +342,7 @@ async def test_websocket_send(send, receive, message_stack):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_text_receive(send, receive, message_stack):
|
||||
async def test_websocket_receive(send, receive, message_stack):
|
||||
msg = {"text": "hello", "type": "websocket.receive"}
|
||||
message_stack.append(msg)
|
||||
|
||||
@@ -351,15 +351,6 @@ async def test_websocket_text_receive(send, receive, message_stack):
|
||||
|
||||
assert text == msg["text"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_bytes_receive(send, receive, message_stack):
|
||||
msg = {"bytes": b"hello", "type": "websocket.receive"}
|
||||
message_stack.append(msg)
|
||||
|
||||
ws = WebSocketConnection(send, receive)
|
||||
data = await ws.receive()
|
||||
|
||||
assert data == msg["bytes"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_accept_with_no_subprotocols(
|
||||
|
||||
@@ -148,6 +148,7 @@ def test_cookie_set_unknown_property():
|
||||
|
||||
|
||||
def test_cookie_set_same_key(app):
|
||||
|
||||
cookies = {"test": "wait"}
|
||||
|
||||
@app.get("/")
|
||||
|
||||
@@ -62,6 +62,7 @@ def exception_handler_app():
|
||||
|
||||
@exception_handler_app.route("/8", error_format="html")
|
||||
def handler_8(request):
|
||||
|
||||
raise ErrorWithRequestCtx("OK")
|
||||
|
||||
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
|
||||
@@ -213,7 +214,7 @@ def test_error_handler_noisy_log(
|
||||
exception_handler_app: Sanic, monkeypatch: MonkeyPatch
|
||||
):
|
||||
err_logger = Mock()
|
||||
monkeypatch.setattr(handlers.error, "error_logger", err_logger)
|
||||
monkeypatch.setattr(handlers, "error_logger", err_logger)
|
||||
|
||||
exception_handler_app.config["NOISY_EXCEPTIONS"] = False
|
||||
exception_handler_app.test_client.get("/1")
|
||||
|
||||
@@ -514,6 +514,7 @@ def test_file_stream_head_response(
|
||||
def test_file_stream_response_range(
|
||||
app: Sanic, file_name, static_file_directory, size, start, end
|
||||
):
|
||||
|
||||
Range = namedtuple("Range", ["size", "start", "end", "total"])
|
||||
total = len(get_file_content(static_file_directory, file_name))
|
||||
range = Range(size=size, start=start, end=end, total=total)
|
||||
|
||||
@@ -213,3 +213,12 @@ def test_pop_list(json_app: Sanic):
|
||||
|
||||
_, resp = json_app.test_client.get("/json-pop")
|
||||
assert resp.body == json_dumps(["b"]).encode()
|
||||
|
||||
|
||||
def test_json_response_class_sets_proper_content_type(json_app: Sanic):
|
||||
@json_app.get("/json-class")
|
||||
async def handler(request: Request):
|
||||
return JSONResponse(JSON_BODY)
|
||||
|
||||
_, resp = json_app.test_client.get("/json-class")
|
||||
assert resp.headers["content-type"] == "application/json"
|
||||
|
||||
@@ -722,6 +722,7 @@ def test_add_webscoket_route_with_version(app):
|
||||
|
||||
|
||||
def test_route_duplicate(app):
|
||||
|
||||
with pytest.raises(RouteExists):
|
||||
|
||||
@app.route("/test")
|
||||
@@ -818,6 +819,7 @@ def test_unquote_add_route(app, unquote):
|
||||
|
||||
|
||||
def test_dynamic_add_route(app):
|
||||
|
||||
results = []
|
||||
|
||||
async def handler(request, name):
|
||||
@@ -832,6 +834,7 @@ def test_dynamic_add_route(app):
|
||||
|
||||
|
||||
def test_dynamic_add_route_string(app):
|
||||
|
||||
results = []
|
||||
|
||||
async def handler(request, name):
|
||||
@@ -935,6 +938,7 @@ def test_dynamic_add_route_unhashable(app):
|
||||
|
||||
|
||||
def test_add_route_duplicate(app):
|
||||
|
||||
with pytest.raises(RouteExists):
|
||||
|
||||
async def handler1(request):
|
||||
@@ -1116,6 +1120,7 @@ def test_route_raise_ParameterNameConflicts(app):
|
||||
|
||||
|
||||
def test_route_invalid_host(app):
|
||||
|
||||
host = 321
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
|
||||
|
||||
@@ -93,7 +93,6 @@ def test_dont_register_system_signals(app):
|
||||
@pytest.mark.skipif(os.name == "nt", reason="windows cannot SIGINT processes")
|
||||
def test_windows_workaround():
|
||||
"""Test Windows workaround (on any other OS)"""
|
||||
|
||||
# At least some code coverage, even though this test doesn't work on
|
||||
# Windows...
|
||||
class MockApp:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -12,6 +13,15 @@ from sanic import Sanic, text
|
||||
from sanic.exceptions import FileNotFound
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def static_file_directory():
|
||||
"""The static directory to serve"""
|
||||
current_file = inspect.getfile(inspect.currentframe())
|
||||
current_directory = os.path.dirname(os.path.abspath(current_file))
|
||||
static_directory = os.path.join(current_directory, "static")
|
||||
return static_directory
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def double_dotted_directory_file(static_file_directory: str):
|
||||
"""Generate double dotted directory and its files"""
|
||||
@@ -108,12 +118,7 @@ def test_static_file_pathlib(app, static_file_directory, file_name):
|
||||
def test_static_file_bytes(app, static_file_directory, file_name):
|
||||
bsep = os.path.sep.encode("utf-8")
|
||||
file_path = static_file_directory.encode("utf-8") + bsep + file_name
|
||||
message = (
|
||||
"Serving a static directory with a bytes "
|
||||
"string is deprecated and will be removed in v22.9."
|
||||
)
|
||||
with pytest.warns(DeprecationWarning, match=message):
|
||||
app.static("/testing.file", file_path)
|
||||
app.static("/testing.file", file_path)
|
||||
request, response = app.test_client.get("/testing.file")
|
||||
assert response.status == 200
|
||||
|
||||
@@ -426,6 +431,7 @@ def test_static_stream_large_file(
|
||||
"file_name", ["test.file", "decode me.txt", "python.png"]
|
||||
)
|
||||
def test_use_modified_since(app, static_file_directory, file_name):
|
||||
|
||||
file_stat = os.stat(get_file_path(static_file_directory, file_name))
|
||||
modified_since = strftime(
|
||||
"%a, %d %b %Y %H:%M:%S GMT", gmtime(file_stat.st_mtime)
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
import os
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from sanic import Sanic
|
||||
from sanic.handlers.directory import DirectoryHandler
|
||||
|
||||
|
||||
def get_file_path(static_file_directory, file_name):
|
||||
return os.path.join(static_file_directory, file_name)
|
||||
|
||||
|
||||
def get_file_content(static_file_directory, file_name):
|
||||
"""The content of the static file to check"""
|
||||
with open(get_file_path(static_file_directory, file_name), "rb") as file:
|
||||
return file.read()
|
||||
|
||||
|
||||
def test_static_directory_view(app: Sanic, static_file_directory: str):
|
||||
app.static("/static", static_file_directory, directory_view=True)
|
||||
|
||||
_, response = app.test_client.get("/static/")
|
||||
assert response.status == 200
|
||||
assert response.content_type == "text/html; charset=utf-8"
|
||||
assert "<title>Directory Viewer</title>" in response.text
|
||||
|
||||
|
||||
def test_static_index_single(app: Sanic, static_file_directory: str):
|
||||
app.static("/static", static_file_directory, index="test.html")
|
||||
|
||||
_, response = app.test_client.get("/static/")
|
||||
assert response.status == 200
|
||||
assert response.body == get_file_content(
|
||||
static_file_directory, "test.html"
|
||||
)
|
||||
assert response.headers["Content-Type"] == "text/html"
|
||||
|
||||
|
||||
def test_static_index_single_not_found(app: Sanic, static_file_directory: str):
|
||||
app.static("/static", static_file_directory, index="index.html")
|
||||
|
||||
_, response = app.test_client.get("/static/")
|
||||
assert response.status == 404
|
||||
|
||||
|
||||
def test_static_index_multiple(app: Sanic, static_file_directory: str):
|
||||
app.static(
|
||||
"/static",
|
||||
static_file_directory,
|
||||
index=["index.html", "test.html"],
|
||||
)
|
||||
|
||||
_, response = app.test_client.get("/static/")
|
||||
assert response.status == 200
|
||||
assert response.body == get_file_content(
|
||||
static_file_directory, "test.html"
|
||||
)
|
||||
assert response.headers["Content-Type"] == "text/html"
|
||||
|
||||
|
||||
def test_static_directory_view_and_index(
|
||||
app: Sanic, static_file_directory: str
|
||||
):
|
||||
app.static(
|
||||
"/static",
|
||||
static_file_directory,
|
||||
directory_view=True,
|
||||
index="foo.txt",
|
||||
)
|
||||
|
||||
_, response = app.test_client.get("/static/nested/")
|
||||
assert response.status == 200
|
||||
assert response.content_type == "text/html; charset=utf-8"
|
||||
assert "<title>Directory Viewer</title>" in response.text
|
||||
|
||||
_, response = app.test_client.get("/static/nested/dir/")
|
||||
assert response.status == 200
|
||||
assert response.body == get_file_content(
|
||||
f"{static_file_directory}/nested/dir", "foo.txt"
|
||||
)
|
||||
assert response.content_type == "text/plain"
|
||||
|
||||
|
||||
def test_static_directory_handler(app: Sanic, static_file_directory: str):
|
||||
dh = DirectoryHandler(
|
||||
"/static",
|
||||
Path(static_file_directory),
|
||||
directory_view=True,
|
||||
index="foo.txt",
|
||||
)
|
||||
app.static("/static", static_file_directory, directory_handler=dh)
|
||||
|
||||
_, response = app.test_client.get("/static/nested/")
|
||||
assert response.status == 200
|
||||
assert response.content_type == "text/html; charset=utf-8"
|
||||
assert "<title>Directory Viewer</title>" in response.text
|
||||
|
||||
_, response = app.test_client.get("/static/nested/dir/")
|
||||
assert response.status == 200
|
||||
assert response.body == get_file_content(
|
||||
f"{static_file_directory}/nested/dir", "foo.txt"
|
||||
)
|
||||
assert response.content_type == "text/plain"
|
||||
|
||||
|
||||
def test_static_directory_handler_fails(app: Sanic):
|
||||
dh = DirectoryHandler(
|
||||
"/static",
|
||||
Path(""),
|
||||
directory_view=True,
|
||||
index="foo.txt",
|
||||
)
|
||||
message = (
|
||||
"When explicitly setting directory_handler, you cannot "
|
||||
"set either directory_view or index. Instead, pass "
|
||||
"these arguments to your DirectoryHandler instance."
|
||||
)
|
||||
with pytest.raises(ValueError, match=message):
|
||||
app.static("/static", "", directory_handler=dh, directory_view=True)
|
||||
with pytest.raises(ValueError, match=message):
|
||||
app.static("/static", "", directory_handler=dh, index="index.html")
|
||||
@@ -654,6 +654,7 @@ def test_sanic_ssl_context_create():
|
||||
reason="This test requires fork context",
|
||||
)
|
||||
def test_ssl_in_multiprocess_mode(app: Sanic, caplog):
|
||||
|
||||
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
|
||||
event = Event()
|
||||
|
||||
|
||||
@@ -176,6 +176,7 @@ def handler(request: Request):
|
||||
|
||||
async def client(app: Sanic, loop: AbstractEventLoop):
|
||||
try:
|
||||
|
||||
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
|
||||
async with httpx.AsyncClient(transport=transport) as client:
|
||||
r = await client.get("http://myhost.invalid/")
|
||||
|
||||
@@ -83,6 +83,7 @@ def test_simple_url_for_getting_with_more_params(app, args, url):
|
||||
|
||||
|
||||
def test_url_for_with_server_name(app):
|
||||
|
||||
server_name = f"{test_host}:{test_port}"
|
||||
app.config.update({"SERVER_NAME": server_name})
|
||||
path = "/myurl"
|
||||
|
||||
@@ -38,6 +38,7 @@ def test_load_module_from_file_location_with_non_existing_env_variable():
|
||||
LoadFileException,
|
||||
match="The following environment variables are not set: MuuMilk",
|
||||
):
|
||||
|
||||
load_module_from_file_location("${MuuMilk}")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user