Compare commits

...

77 Commits

Author SHA1 Message Date
Adam Hopkins
d9f7086ee6 Add color changes 2023-02-16 14:59:09 +02:00
SML
26e999dec0 Switch blue to code block orange in sanic docs. Add sanic highlight override. Add darker sanic background (111) as option but in the comments so we can switch back if need be. 2023-02-09 16:49:16 +08:00
SML
8f305047c0 Add header background, change all dark theme grays to neutral grays. 2023-02-09 16:35:20 +08:00
SML
59f9b5cc28 Ensure sanic brand color be the same in dark mode. 2023-02-09 16:23:01 +08:00
SML
94eff28fda Add tab hover shadow definition. Intended to have better hover accents for light theme. 2023-02-09 07:03:09 +08:00
SML
c111d93840 Define highlight and tab colors as variables 2023-02-09 06:46:13 +08:00
SML
3587a4fe15 Define sanic-background, sanic-text as top-level colors. Restructure color schemes to top of definition. 2023-02-09 06:13:00 +08:00
SML
51a9605668 Soften lightmode text color 2023-02-09 06:08:00 +08:00
SML
14f16352fc Add tracerite tab color with new sanic-tab variable. 2023-02-09 06:07:45 +08:00
SML
ecf34896c8 Add margins for h3. Probably should be done in the tracerite package. 2023-02-09 04:56:30 +08:00
SML
e544cd8af6 Tweak color scheme for dark themes 2023-02-09 04:55:57 +08:00
SML
aa8af0dcea invalid text justify css 2023-02-09 03:36:20 +08:00
SML
db0b9046d7 Add padding for main container so text doesn’t touch screen boundary 2023-02-09 00:39:26 +08:00
SML
f798eda446 Remove forced hanging emoji from error header 2023-02-09 00:36:20 +08:00
SML
21ad1ae61b Remove tab styling 2023-02-09 00:14:06 +08:00
SML
d8bf65ad1b Tweak colors for dark mode 2023-02-09 00:13:39 +08:00
SML
844cab2d6b Tweak colors for light mode 2023-02-09 00:07:29 +08:00
SML
b0cb01d1a4 Add color sections for dark mode 2023-02-08 23:59:32 +08:00
L. Karkkainen
4c55051442 Fix scrollbar layout problems. 2023-02-08 01:34:41 +00:00
L. Karkkainen
37f3607ebc Friendlify the error page. 2023-02-08 01:26:30 +00:00
L. Karkkainen
7e617c1769 Misc styling 2023-02-07 22:03:10 +00:00
L. Karkkainen
a5f732cc80 Style and layout changes, fix document title. 2023-02-07 21:25:14 +00:00
L. Karkkainen
ce19908bc0 Implement key-value-table as dl instead of table element. 2023-02-07 20:51:25 +00:00
L. Karkkainen
a6efebda56 Cleanup. 2023-02-07 11:00:05 +00:00
L. Karkkainen
da9ff33fa7 Add traceback suppressions to commonly shown internals. 2023-02-07 10:58:46 +00:00
L. Karkkainen
526115c3c5 Fix SVG: alt to desc, whitespace removed 2023-02-06 16:07:58 +00:00
Adam Hopkins
12ba685bf6 Some Sanic branding to error page (#2673)
* Some Sanic branding to error page

* Simplify logo compat
2023-02-06 15:41:01 +00:00
L. Karkkainen
39b98e6b45 Logo shadow to make it show up on white background. 2023-02-06 13:13:01 +00:00
L. Karkkainen
3ddbda61d9 Merge branch 'html-error-handling-suggestions' into niceback-error-handling 2023-02-06 13:02:31 +00:00
L. Karkkainen
68bf26df17 Merge branch 'main' into niceback-error-handling 2023-02-06 12:57:48 +00:00
Adam Hopkins
a773ad2354 Cleanup styles and add Sanic palette 2023-02-06 14:25:22 +02:00
Adam Hopkins
47b2459811 Suggestions 2023-02-06 10:05:51 +02:00
Adam Hopkins
7491d567a3 Merge branch 'niceback-error-handling' of github.com:sanic-org/sanic into niceback-error-handling 2023-02-05 22:13:03 +02:00
L. Karkkainen
783a29bc0b Require tracerite=>1.0.0 2023-02-05 18:49:27 +00:00
L. Karkkainen
cf76c05d3f CSS loading with STYLE_APPEND 2023-02-05 17:26:58 +00:00
Adam Hopkins
d6f2613623 Merge branch 'main' of github.com:sanic-org/sanic into niceback-error-handling 2023-02-05 18:37:08 +02:00
L. Karkkainen
a6ff13ceed Fallback for route name when no route found. 2023-02-05 16:06:45 +00:00
L. Karkkainen
207f8af11f setup.py 2023-02-05 15:58:25 +00:00
L. Karkkainen
5c65118d12 CSS files now start with comments, none inserted in code. 2023-02-05 15:58:09 +00:00
L. Karkkainen
f4792a2bc6 CSS fixes, incl. scaling layout for small screens. 2023-02-05 15:46:12 +00:00
L. Karkkainen
53b7a5a5a1 Merge branch 'main' into niceback-error-handling 2023-02-05 15:16:05 +00:00
Ryu Juheon
5e7f6998bd fix(websocket): ASGI websocket must pass thru bytes as is (#2651) 2023-02-05 16:41:54 +02:00
L. Kärkkäinen
c7a71cd00c Use FALLBACK_ERROR_FORMAT for handlers that return empty() (#2659)
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
2023-02-05 15:29:01 +02:00
Adam Hopkins
9cb9e88678 Establish basic file browser and index fallback (#2662)
Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
2023-02-05 15:09:04 +02:00
L. Karkkainen
ebc2f46682 Minor style changes, loading TraceRite style and css properly, printing exception extra data. 2023-02-04 00:27:29 +00:00
L. Kärkkäinen
ff47448585 Merge branch 'issue2661' into niceback-error-handling 2023-02-02 14:14:56 +00:00
Adam Hopkins
2df5b19fd4 Sans serif w/ autoindex monospace 2023-01-31 11:09:24 +02:00
Adam Hopkins
5dfd48f855 Merge branch 'main' of github.com:sanic-org/sanic into issue2661 2023-01-31 10:44:49 +02:00
Adam Hopkins
ddf3a49988 Remove accidental file 2023-01-31 10:43:19 +02:00
Adam Hopkins
1b43aa5f2f No auto registration of fallback error handlers 2023-01-31 10:40:47 +02:00
Adam Hopkins
713abe3cf2 Merge branch 'issue2661' of github.com:sanic-org/sanic into issue2661 2023-01-31 10:04:46 +02:00
Adam Hopkins
b46b81d43a Set styles 2023-01-31 10:03:39 +02:00
Rodolfo Olivieri
30c53b6857 Remove deprecated property in blueprint (#2666)
Fixes https://github.com/sanic-org/sanic/issues/2442
2023-01-30 09:26:55 +02:00
L. Kärkkäinen
4f000ab59c Define colors for light/default mode as well.
The background is not always white by default (e.g. when in an iframe), so it needs to be defined or text may become invisible (just had this problem today). I've taken the opportunity to make it slightly less bright as well.
2023-01-28 23:36:42 +00:00
L. Kärkkäinen
ae757c8ad6 Merge branch 'issue2661' into niceback-error-handling 2023-01-28 23:29:14 +00:00
Adam Hopkins
f30f53f67d Move DirectoryHandler to app instance 2023-01-28 23:25:25 +02:00
L. Karkkainen
ea09906e0a Refactored to sanic.pages.error module. Traceback and style tuning. Print also headers, and for other than 500 errors as well. 500 error message text UX workaround. 2023-01-27 18:50:35 +00:00
L. Karkkainen
77bdfa14ed HTML error formatting rebuilt on top of BasePage, and using external module niceback to do the tracebacks. 2023-01-27 06:45:49 +00:00
L. Karkkainen
faf1ff8d4f Fix the document title (needs positional argument). 2023-01-27 05:39:45 +00:00
L. Karkkainen
b5175238fb URL sanitation. 2023-01-27 05:31:52 +00:00
L. Karkkainen
32d62c2db4 Style tweaks 2023-01-27 05:24:02 +00:00
L. Karkkainen
a00ec8ab37 Better UX for empty folders. 2023-01-27 04:38:49 +00:00
L. Karkkainen
859a8130c1 Fix Sanic brand colour in breadcrumbs. 2023-01-27 04:38:21 +00:00
L. Karkkainen
2038799d7a Remove parent directory link from table 2023-01-27 04:30:23 +00:00
L. Karkkainen
41da8bbd61 Improve navigation with breadcrumbs 2023-01-27 04:29:15 +00:00
L. Karkkainen
e328d4406b Timestamps a bit less ugly. 2023-01-27 03:23:12 +00:00
L. Karkkainen
d9c883eb9b Add a header bar with Sanic logo. Remove duplicate title element. Avoid escaping in style element and add new styles. 2023-01-27 03:05:00 +00:00
Adam Hopkins
10d4f2803a Simple server to include autoindex 2023-01-26 18:06:13 +02:00
Adam Hopkins
fa6dbddf69 Style fixes for file table 2023-01-26 17:26:42 +02:00
Adam Hopkins
2c8f1807d8 squash 2023-01-26 17:22:13 +02:00
Adam Hopkins
ca0e933813 No logging of exception 2023-01-26 17:12:51 +02:00
Adam Hopkins
2e36507a60 Refactor to allow for common pages 2023-01-26 16:50:45 +02:00
Adam Hopkins
39a4a75dcb Add new pages module 2023-01-26 15:52:26 +02:00
Adam Hopkins
e8bb2834d6 Valid HTML5 2023-01-26 00:45:30 +02:00
Adam Hopkins
36e3cc9df7 Use html5tagger for AutoIndex 2023-01-26 00:36:37 +02:00
Adam Hopkins
fed2ef3527 Remove location information 2023-01-24 10:48:54 +02:00
Adam Hopkins
6673acf544 Establish basic file browser and index fallback 2023-01-24 10:27:55 +02:00
54 changed files with 1342 additions and 526 deletions

View File

@@ -9,6 +9,7 @@ omit =
sanic/simple.py
sanic/utils.py
sanic/cli
sanic/pages
[html]
directory = coverage

View File

@@ -17,7 +17,8 @@ ignore:
- "sanic/compat.py"
- "sanic/simple.py"
- "sanic/utils.py"
- "sanic/cli"
- "sanic/cli/"
- "sanic/pages/"
- ".github/"
- "changelogs/"
- "docker/"

View File

@@ -23,5 +23,6 @@ module = [
"trustme.*",
"sanic_routing.*",
"aioquic.*",
"html5tagger.*",
]
ignore_missing_imports = true

View File

@@ -1 +1 @@
__version__ = "22.12.0"
__version__ = "23.3.0"

View File

@@ -72,6 +72,7 @@ from sanic.log import (
from sanic.middleware import Middleware, MiddlewareLocation
from sanic.mixins.listeners import ListenerEvent
from sanic.mixins.startup import StartupMixin
from sanic.mixins.static import StaticHandleMixin
from sanic.models.futures import (
FutureException,
FutureListener,
@@ -79,7 +80,6 @@ from sanic.models.futures import (
FutureRegistry,
FutureRoute,
FutureSignal,
FutureStatic,
)
from sanic.models.handler_types import ListenerType, MiddlewareType
from sanic.models.handler_types import Sanic as SanicVar
@@ -106,7 +106,7 @@ if OS_IS_WINDOWS: # no cov
enable_windows_color_support()
class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"""
The main application instance
"""
@@ -157,6 +157,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"strict_slashes",
"websocket_enabled",
"websocket_tasks",
"wrappers",
)
_app_registry: Dict[str, "Sanic"] = {}
@@ -441,9 +442,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
return routes
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _apply_middleware(
self,
middleware: FutureMiddleware,
@@ -878,6 +876,8 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
:param request: HTTP Request object
:return: Nothing
"""
__tracebackhide__ = True
await self.dispatch(
"http.lifecycle.handle",
inline=True,
@@ -890,11 +890,11 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
Union[
BaseHTTPResponse,
Coroutine[Any, Any, Optional[BaseHTTPResponse]],
ResponseStream,
]
] = None
run_middleware = True
try:
await self.dispatch(
"http.routing.before",
inline=True,
@@ -926,7 +926,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
and request.stream.request_body
and not route.extra.ignore_body
):
if hasattr(handler, "is_stream"):
# Streaming handler: lift the size limit
request.stream.request_max_size = float("inf")
@@ -1000,7 +999,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
...
await response.send(end_stream=True)
elif isinstance(response, ResponseStream):
resp = await response(request) # type: ignore
resp = await response(request)
await self.dispatch(
"http.lifecycle.response",
inline=True,
@@ -1009,7 +1008,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"response": resp,
},
)
await response.eof() # type: ignore
await response.eof()
else:
if not hasattr(handler, "is_websocket"):
raise ServerError(

View File

@@ -40,6 +40,8 @@ FULL_COLOR_LOGO = """
""" # noqa
SVG_LOGO_SIMPLE = """<svg id=logo-simple viewBox="0 0 964 279"><desc>Sanic</desc><path d="M107 222c9-2 10-20 1-22s-20-2-30-2-17 7-16 14 6 10 15 10h30zm115-1c16-2 30-11 35-23s6-24 2-33-6-14-15-20-24-11-38-10c-7 3-10 13-5 19s17-1 24 4 15 14 13 24-5 15-14 18-50 0-74 0h-17c-6 4-10 15-4 20s16 2 23 3zM251 83q9-1 9-7 0-15-10-16h-13c-10 6-10 20 0 22zM147 60c-4 0-10 3-11 11s5 13 10 12 42 0 67 0c5-3 7-10 6-15s-4-8-9-8zm-33 1c-8 0-16 0-24 3s-20 10-25 20-6 24-4 36 15 22 26 27 78 8 94 3c4-4 4-12 0-18s-69 8-93-10c-8-7-9-23 0-30s12-10 20-10 12 2 16-3 1-15-5-18z" fill="#ff0d68"/><path d="M676 74c0-14-18-9-20 0s0 30 0 39 20 9 20 2zm-297-10c-12 2-15 12-23 23l-41 58H340l22-30c8-12 23-13 30-4s20 24 24 38-10 10-17 10l-68 2q-17 1-48 30c-7 6-10 20 0 24s15-8 20-13 20 -20 58-21h50 c20 2 33 9 52 30 8 10 24-4 16-13L384 65q-3-2-5-1zm131 0c-10 1-12 12-11 20v96c1 10-3 23 5 32s20-5 17-15c0-23-3-46 2-67 5-12 22-14 32-5l103 87c7 5 19 1 18-9v-64c-3-10-20-9-21 2s-20 22-30 13l-97-80c-5-4-10-10-18-10zM701 76v128c2 10 15 12 20 4s0-102 0-124s-20-18-20-7z M850 63c-35 0-69-2-86 15s-20 60-13 66 13 8 16 0 1-10 1-27 12-26 20-32 66-5 85-5 31 4 31-10-18-7-54-7M764 159c-6-2-15-2-16 12s19 37 33 43 23 8 25-4-4-11-11-14q-9-3-22-18c-4-7-3-16-10-19zM828 196c-4 0-8 1-10 5s-4 12 0 15 8 2 12 2h60c5 0 10-2 12-6 3-7-1-16-8-16z" fill="#1f1f1f"/></svg>""" # noqa
ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

View File

@@ -9,6 +9,7 @@ from sanic.mixins.listeners import ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
from sanic.mixins.signals import SignalMixin
from sanic.mixins.static import StaticMixin
VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
@@ -16,6 +17,7 @@ VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
class BaseSanic(
RouteMixin,
StaticMixin,
MiddlewareMixin,
ListenerMixin,
ExceptionMixin,

View File

@@ -105,6 +105,7 @@ class Blueprint(BaseSanic):
"version",
"version_prefix",
"websocket_routes",
"wrappers",
)
def __init__(
@@ -304,9 +305,6 @@ class Blueprint(BaseSanic):
# Routes
for future in self._future_routes:
# attach the blueprint name to the handler so that it can be
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available
uri = self._setup_uri(future.uri, url_prefix)

View File

@@ -4,7 +4,6 @@ from sanic.compat import UpperStrEnum
class HTTPMethod(UpperStrEnum):
GET = auto()
POST = auto()
PUT = auto()
@@ -15,7 +14,6 @@ class HTTPMethod(UpperStrEnum):
class LocalCertCreator(UpperStrEnum):
AUTO = auto()
TRUSTME = auto()
MKCERT = auto()

View File

@@ -12,6 +12,7 @@ Setting ``app.config.FALLBACK_ERROR_FORMAT = "auto"`` will enable a switch that
will attempt to provide an appropriate response format based upon the
request type.
"""
from __future__ import annotations
import sys
import typing as t
@@ -21,8 +22,8 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES
from sanic.request import Request
from sanic.response import HTTPResponse, html, json, text
from sanic.pages.error import ErrorPage
from sanic.response import html, json, text
dumps: t.Callable[..., str]
@@ -33,6 +34,8 @@ try:
except ImportError: # noqa
from json import dumps
if t.TYPE_CHECKING:
from sanic import HTTPResponse, Request
DEFAULT_FORMAT = "auto"
FALLBACK_TEXT = (
@@ -157,36 +160,21 @@ class HTMLRenderer(BaseRenderer):
"{body}"
)
def full(self) -> HTTPResponse:
return html(
self.OUTPUT_HTML.format(
title=self.title,
text=self.text,
style=self.TRACEBACK_STYLE,
body=self._generate_body(full=True),
),
status=self.status,
def _page(self, full: bool) -> HTTPResponse:
page = ErrorPage(
title=super().title,
text=super().text,
request=self.request,
exc=self.exception,
full=full,
)
return html(page.render(), status=self.status, headers=self.headers)
def full(self) -> HTTPResponse:
return self._page(full=True)
def minimal(self) -> HTTPResponse:
return html(
self.OUTPUT_HTML.format(
title=self.title,
text=self.text,
style=self.TRACEBACK_STYLE,
body=self._generate_body(full=False),
),
status=self.status,
headers=self.headers,
)
@property
def text(self):
return escape(super().text)
@property
def title(self):
return escape(f"⚠️ {super().title}")
return self._page(full=False)
def _generate_body(self, *, full):
lines = []
@@ -404,16 +392,13 @@ CONTENT_TYPE_BY_RENDERERS = {
v: k for k, v in RENDERERS_BY_CONTENT_TYPE.items()
}
# Handler source code is checked for which response types it returns with the
# route error_format="auto" (default) to determine which format to use.
RESPONSE_MAPPING = {
"empty": "html",
"json": "json",
"text": "text",
"raw": "text",
"html": "html",
"file": "html",
"file_stream": "text",
"stream": "text",
"redirect": "html",
"JSONResponse": "json",
"text/plain": "text",
"text/html": "html",
"application/json": "json",

View File

@@ -0,0 +1,10 @@
from .content_range import ContentRangeHandler
from .directory import DirectoryHandler
from .error import ErrorHandler
__all__ = (
"ContentRangeHandler",
"DirectoryHandler",
"ErrorHandler",
)

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
from datetime import datetime
from operator import itemgetter
from pathlib import Path
from stat import S_ISDIR
from typing import Dict, Iterable, Optional, Sequence, Union, cast
from sanic.exceptions import NotFound
from sanic.pages.directory_page import DirectoryPage, FileInfo
from sanic.request import Request
from sanic.response import file, html, redirect
class DirectoryHandler:
def __init__(
self,
uri: str,
directory: Path,
directory_view: bool = False,
index: Optional[Union[str, Sequence[str]]] = None,
) -> None:
if isinstance(index, str):
index = [index]
elif index is None:
index = []
self.base = uri.strip("/")
self.directory = directory
self.directory_view = directory_view
self.index = tuple(index)
async def handle(self, request: Request, path: str):
current = path.strip("/")[len(self.base) :].strip("/") # noqa: E203
for file_name in self.index:
index_file = self.directory / current / file_name
if index_file.is_file():
return await file(index_file)
if self.directory_view:
return self._index(
self.directory / current, path, request.app.debug
)
if self.index:
raise NotFound("File not found")
raise IsADirectoryError(f"{self.directory.as_posix()} is a directory")
def _index(self, location: Path, path: str, debug: bool):
# Remove empty path elements, append slash
if "//" in path or not path.endswith("/"):
return redirect(
"/" + "".join([f"{p}/" for p in path.split("/") if p])
)
# Render file browser
page = DirectoryPage(self._iter_files(location), path, debug)
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
stat = path.stat()
modified = (
datetime.fromtimestamp(stat.st_mtime)
.isoformat()[:19]
.replace("T", " ")
)
is_dir = S_ISDIR(stat.st_mode)
icon = "📁" if is_dir else "📄"
file_name = path.name
if is_dir:
file_name += "/"
return {
"priority": is_dir * -1,
"file_name": file_name,
"icon": icon,
"file_access": modified,
"file_size": stat.st_size,
}
def _iter_files(self, location: Path) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in location.iterdir()]
for item in sorted(prepared, key=itemgetter("priority", "file_name")):
del item["priority"]
yield cast(FileInfo, item)

View File

@@ -3,11 +3,6 @@ from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Type
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
from sanic.log import deprecation, error_logger
from sanic.models.handler_types import RouteHandler
from sanic.response import text
@@ -23,7 +18,6 @@ class ErrorHandler:
by the developers to perform a wide range of tasks from recording the error
stats to reporting them to an external service that can be used for
realtime alerting system.
"""
def __init__(
@@ -196,74 +190,3 @@ class ErrorHandler:
error_logger.exception(
"Exception occurred while handling uri: %s", url
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@@ -126,7 +126,6 @@ class CertCreator(ABC):
local_tls_key,
local_tls_cert,
) -> CertCreator:
creator: Optional[CertCreator] = None
cert_creator_options: Tuple[

35
sanic/mixins/base.py Normal file
View File

@@ -0,0 +1,35 @@
from typing import Optional
from sanic.base.meta import SanicMeta
class BaseMixin(metaclass=SanicMeta):
name: str
strict_slashes: Optional[bool]
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name

View File

@@ -14,6 +14,7 @@ class MiddlewareMixin(metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_middleware: List[FutureMiddleware] = []
self.wrappers = []
def _apply_middleware(self, middleware: FutureMiddleware):
raise NotImplementedError # noqa
@@ -140,3 +141,7 @@ class MiddlewareMixin(metaclass=SanicMeta):
reverse=True,
)[::-1]
)
def wrap(self, handler):
self.wrappers.append(handler)
return handler

View File

@@ -1,11 +1,6 @@
from ast import NodeVisitor, Return, parse
from contextlib import suppress
from email.utils import formatdate
from functools import partial, wraps
from inspect import getsource, signature
from mimetypes import guess_type
from os import path
from pathlib import Path, PurePath
from textwrap import dedent
from typing import (
Any,
@@ -19,20 +14,15 @@ from typing import (
Union,
cast,
)
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, HTTP_METHODS
from sanic.constants import HTTP_METHODS
from sanic.errorpages import RESPONSE_MAPPING
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureRoute, FutureStatic
from sanic.models.handler_types import RouteHandler
from sanic.response import HTTPResponse, file, file_stream, validate_file
from sanic.types import HashableDict
@@ -41,20 +31,14 @@ RouteWrapper = Callable[
]
class RouteMixin(metaclass=SanicMeta):
name: str
class RouteMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_routes: Set[FutureRoute] = set()
self._future_statics: Set[FutureStatic] = set()
self.strict_slashes: Optional[bool] = False
def _apply_route(self, route: FutureRoute) -> List[Route]:
raise NotImplementedError # noqa
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def route(
self,
uri: str,
@@ -688,324 +672,6 @@ class RouteMixin(metaclass=SanicMeta):
**ctx_kwargs,
)(handler)
def static(
self,
uri: str,
file_or_directory: Union[str, bytes, PurePath],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: bool = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[bool] = None,
apply: bool = True,
resource_type: Optional[str] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path
async def _static_request_handler(
self,
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
request,
content_type=None,
__file_uri__=None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if type(stream_large_files) == int:
threshold = stream_large_files
else:
threshold = 1024 * 1024
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
f"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
:param app: Sanic
:param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the
server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the file_stream() handler
rather than the file() handler to send the file
If this is an integer, this represents the
threshold size to switch to file_stream()
:param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header
:return: registered static routes
:rtype: List[sanic.router.Route]
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory,
static.use_modified_since,
static.use_content_range,
static.stream_large_files,
content_type=static.content_type,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
def _determine_error_format(self, handler) -> str:
with suppress(OSError, TypeError):
src = dedent(getsource(handler))

View File

@@ -267,11 +267,11 @@ class StartupMixin(metaclass=SanicMeta):
if single_process and legacy:
raise RuntimeError("Cannot run single process and legacy mode")
if register_sys_signals is False and not (single_process or legacy):
raise RuntimeError(
"Cannot run Sanic.serve with register_sys_signals=False. "
"Use either Sanic.serve_single or Sanic.serve_legacy."
)
# if register_sys_signals is False and not (single_process or legacy):
# raise RuntimeError(
# "Cannot run Sanic.serve with register_sys_signals=False. "
# "Use either Sanic.serve_single or Sanic.serve_legacy."
# )
if motd_display:
self.config.MOTD_DISPLAY.update(motd_display)
@@ -1109,7 +1109,6 @@ class StartupMixin(metaclass=SanicMeta):
app: StartupMixin,
server_info: ApplicationServerInfo,
) -> None: # no cov
try:
# We should never get to this point without a server
# This is primarily to keep mypy happy

348
sanic/mixins/static.py Normal file
View File

@@ -0,0 +1,348 @@
from email.utils import formatdate
from functools import partial, wraps
from mimetypes import guess_type
from os import PathLike, path
from pathlib import Path, PurePath
from typing import Optional, Sequence, Set, Union, cast
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.handlers.directory import DirectoryHandler
from sanic.log import deprecation, error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureStatic
from sanic.request import Request
from sanic.response import HTTPResponse, file, file_stream, validate_file
class StaticMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_statics: Set[FutureStatic] = set()
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def static(
self,
uri: str,
file_or_directory: Union[PathLike, str, bytes],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: Union[bool, int] = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[str] = None,
apply: bool = True,
resource_type: Optional[str] = None,
index: Optional[Union[str, Sequence[str]]] = None,
directory_view: bool = False,
directory_handler: Optional[DirectoryHandler] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:param apply: If true, will register the route immediately
:param resource_type: Explicitly declare a resource to be a "
file" or a "dir"
:param index: When exposing against a directory, index is the name that
will be served as the default file. When multiple files names are
passed, then they will be tried in order.
:param directory_view: Whether to fallback to showing the directory
viewer when exposing a directory
:param directory_handler: An instance of :class:`DirectoryHandler`
that can be used for explicitly controlling and subclassing the
behavior of the default directory handler
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
if isinstance(file_or_directory, bytes):
deprecation(
"Serving a static directory with a bytes string is "
"deprecated and will be removed in v22.9.",
22.9,
)
file_or_directory = cast(str, file_or_directory.decode())
file_or_directory = Path(file_or_directory)
if directory_handler and (directory_view or index):
raise ValueError(
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
if not directory_handler:
directory_handler = DirectoryHandler(
uri=uri,
directory=file_or_directory,
directory_view=directory_view,
index=index,
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
directory_handler,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
class StaticHandleMixin(metaclass=SanicMeta):
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory=file_or_directory,
use_modified_since=static.use_modified_since,
use_content_range=static.use_content_range,
stream_large_files=static.stream_large_files,
content_type=static.content_type,
directory_handler=static.directory_handler,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
async def _static_request_handler(
self,
request: Request,
*,
file_or_directory: PathLike,
use_modified_since: bool,
use_content_range: bool,
stream_large_files: Union[bool, int],
directory_handler: DirectoryHandler,
content_type: Optional[str] = None,
__file_uri__: Optional[str] = None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if isinstance(stream_large_files, bool):
threshold = 1024 * 1024
else:
threshold = stream_large_files
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except (IsADirectoryError, PermissionError):
return await directory_handler.handle(request, request.path)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path

View File

@@ -1,6 +1,7 @@
from pathlib import PurePath
from pathlib import Path
from typing import Dict, Iterable, List, NamedTuple, Optional, Union
from sanic.handlers.directory import DirectoryHandler
from sanic.models.handler_types import (
ErrorMiddlewareType,
ListenerType,
@@ -46,16 +47,17 @@ class FutureException(NamedTuple):
class FutureStatic(NamedTuple):
uri: str
file_or_directory: Union[str, bytes, PurePath]
file_or_directory: Path
pattern: str
use_modified_since: bool
use_content_range: bool
stream_large_files: bool
stream_large_files: Union[bool, int]
name: str
host: Optional[str]
strict_slashes: Optional[bool]
content_type: Optional[bool]
content_type: Optional[str]
resource_type: Optional[str]
directory_handler: DirectoryHandler
class FutureSignal(NamedTuple):

0
sanic/pages/__init__.py Normal file
View File

52
sanic/pages/base.py Normal file
View File

@@ -0,0 +1,52 @@
from abc import ABC, abstractmethod
from html5tagger import HTML, Document
from sanic import __version__ as VERSION
from sanic.application.logo import SVG_LOGO_SIMPLE
from sanic.pages.css import CSS
class BasePage(ABC, metaclass=CSS): # no cov
TITLE = "Sanic"
CSS: str
def __init__(self, debug: bool = True) -> None:
self.doc = None
self.debug = debug
@property
def style(self) -> str:
return self.CSS
def render(self) -> str:
self.doc = Document(self.TITLE, lang="en")
self._head()
self._body()
self._foot()
return str(self.doc)
def _head(self) -> None:
self.doc.style(HTML(self.style))
with self.doc.header:
self.doc.div(self.TITLE)
def _foot(self) -> None:
with self.doc.footer:
self.doc.div("powered by")
with self.doc.div:
self._sanic_logo()
if self.debug:
self.doc.div(f"Version {VERSION}")
@abstractmethod
def _body(self) -> None:
...
def _sanic_logo(self) -> None:
self.doc.a(
HTML(SVG_LOGO_SIMPLE),
href="https://sanic.dev",
target="_blank",
referrerpolicy="no-referrer",
)

35
sanic/pages/css.py Normal file
View File

@@ -0,0 +1,35 @@
from abc import ABCMeta
from pathlib import Path
from typing import Optional
CURRENT_DIR = Path(__file__).parent
def _extract_style(maybe_style: Optional[str], name: str) -> str:
if maybe_style is not None:
maybe_path = Path(maybe_style)
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return maybe_style
maybe_path = CURRENT_DIR / "styles" / f"{name}.css"
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return ""
class CSS(ABCMeta):
"""Cascade stylesheets, i.e. combine all ancestor styles"""
def __new__(cls, name, bases, attrs):
Page = super().__new__(cls, name, bases, attrs)
# Use a locally defined STYLE or the one from styles directory
Page.STYLE = _extract_style(attrs.get("STYLE_FILE"), name)
Page.STYLE += attrs.get("STYLE_APPEND", "")
# Combine with all ancestor styles
Page.CSS = "".join(
Class.STYLE
for Class in reversed(Page.__mro__)
if type(Class) is CSS
)
return Page

View File

@@ -0,0 +1,66 @@
import sys
from typing import Dict, Iterable
from html5tagger import E
from .base import BasePage
if sys.version_info < (3, 8): # no cov
FileInfo = Dict
else:
from typing import TypedDict
class FileInfo(TypedDict):
icon: str
file_name: str
file_access: str
file_size: str
class DirectoryPage(BasePage): # no cov
TITLE = "Directory Viewer"
def __init__(
self, files: Iterable[FileInfo], url: str, debug: bool
) -> None:
super().__init__(debug)
self.files = files
self.url = url
def _body(self) -> None:
with self.doc.main:
self._headline()
files = list(self.files)
if files:
self._file_table(files)
else:
self.doc.p("The folder is empty.")
def _headline(self):
"""Implement a heading with the current path, combined with
breadcrumb links"""
with self.doc.h1(id="breadcrumbs"):
p = self.url.split("/")[:-1]
for i, part in enumerate(p):
path = "/".join(p[: i + 1]) + "/"
with self.doc.a(href=path):
self.doc.span(part, class_="dir").span("/", class_="sep")
def _file_table(self, files: Iterable[FileInfo]):
with self.doc.table(class_="autoindex container"):
for f in files:
self._file_row(**f)
def _file_row(
self,
icon: str,
file_name: str,
file_access: str,
file_size: str,
):
first = E.span(icon, class_="icon").a(file_name, href=file_name)
self.doc.tr.td(first).td(file_size).td(file_access)

105
sanic/pages/error.py Normal file
View File

@@ -0,0 +1,105 @@
from typing import Any, Mapping
import tracerite.html
from html5tagger import E
from tracerite import html_traceback, inspector
from sanic.request import Request
from .base import BasePage
# Avoid showing the request in the traceback variable inspectors
inspector.blacklist_types += (Request,)
ENDUSER_TEXT = """We're sorry, but it looks like something went wrong. Please try refreshing the page or navigating back to the homepage. If the issue persists, our technical team is working to resolve it as soon as possible. We apologize for the inconvenience and appreciate your patience.""" # noqa: E501
class ErrorPage(BasePage):
STYLE_APPEND = tracerite.html.style
def __init__(
self,
title: str,
text: str,
request: Request,
exc: Exception,
full: bool,
) -> None:
super().__init__()
# Internal server errors come with the text of the exception,
# which we don't want to show to the user.
# FIXME: Needs to be done in a better way, elsewhere
if "Internal Server Error" in title:
text = "The application encountered an unexpected error and could not continue." # noqa: E501
name = request.app.name.replace("_", " ").strip()
if name.islower():
name = name.title()
self.TITLE = E("Application ").strong(name)(
" cannot handle your request"
)
self.title = title
self.text = text
self.request = request
self.exc = exc
self.full = full
def _head(self) -> None:
self.doc._script(tracerite.html.javascript)
super()._head()
def _body(self) -> None:
debug = self.request.app.debug
try:
route_name = self.request.route.name
except AttributeError:
route_name = "[route not found]"
with self.doc.main:
self.doc.h1(f"⚠️ {self.title}").p(self.text)
# Show context details if available on the exception
context = getattr(self.exc, "context", None)
if context:
self._key_value_table(
"Issue context", "exception-context", context
)
if not debug:
with self.doc.div(id="enduser"):
self.doc.p(ENDUSER_TEXT).p.a("Front Page", href="/")
return
# Show additional details in debug mode,
# open by default for 500 errors
with self.doc.details(open=self.full, class_="smalltext"):
# Show extra details if available on the exception
extra = getattr(self.exc, "extra", None)
if extra:
self._key_value_table(
"Issue extra data", "exception-extra", extra
)
self.doc.summary(
"Details for developers (Sanic debug mode only)"
)
if self.exc:
self.doc.h2(f"Exception in {route_name}:")
self.doc(html_traceback(self.exc, include_js_css=False))
self._key_value_table(
f"{self.request.method} {self.request.path}",
"request-headers",
self.request.headers,
)
def _key_value_table(
self, title: str, table_id: str, data: Mapping[str, Any]
) -> None:
self.doc.h2(title)
with self.doc.dl(id=table_id, class_="key-value-table smalltext"):
for key, value in data.items():
# Reading values may cause a new exception, so suppress it
try:
value = str(value)
except Exception:
value = E.em("Unable to display value")
self.doc.dt.span(key, class_="nobr key").span(": ").dd(value)

View File

@@ -0,0 +1,128 @@
/** BasePage **/
:root {
--sanic: #ff0d68;
--sanic-blue: #0092FF;
--sanic-yellow: #FFE900;
--sanic-purple: #833FE3;
--sanic-green: #37ae6f;
--sanic-background: #f1f5f9;
--sanic-text: #1f2937;
--sanic-tab-background: #fff;
--sanic-tab-text: #0f172a;
--sanic-tab-shadow: #adadad;
--sanic-highlight-background: var(--sanic-yellow);
--sanic-highlight-text: var(--sanic-text);
--sanic-header-background: #000;
}
@media (prefers-color-scheme: dark) {
:root {
--sanic-purple: #D246DE;
--sanic-green: #16DB93;
--sanic-background: #111;
--sanic-text: #e7e7e7;
--sanic-tab-background: #484848;
--sanic-tab-text: #e1e1e1;
--sanic-tab-shadow: #000;
--sanic-highlight-background: var(--sanic-yellow);
--sanic-highlight-text: #000;
--sanic-header-background: #000;
}
}
html {
font: 16px sans-serif;
background: var(--sanic-background);
color: var(--sanic-text);
scrollbar-gutter: stable;
overflow: hidden auto;
}
body {
margin: 0;
font-size: 1.25rem;
line-height: 125%;
}
body>* {
padding: 1rem 2vw;
}
@media (max-width: 1000px) {
body>* {
padding: 0.5rem 1.5vw;
}
html {
/* Scale everything by rem of 6px-16px by viewport width */
font-size: calc(6px + 10 * 100vw / 1000);
}
}
main {
min-height: 70vh;
/* Make sure the footer is closer to bottom */
padding: 1rem 2.5rem;
/* Generous padding for readability */
}
.smalltext {
font-size: 1.0rem;
}
.container {
min-width: 600px;
max-width: 1600px;
}
header {
background: #111;
color: #e1e1e1;
border-bottom: 1px solid #272727;
text-align: center;
}
footer {
text-align: center;
display: flex;
flex-direction: column;
font-size: 0.8rem;
margin-top: 2rem;
}
h1 {
text-align: left;
}
a:visited {
color: inherit;
}
a {
text-decoration: none;
color: #88f;
}
a:hover,
a:focus {
text-decoration: underline;
outline: none;
}
span.icon {
margin-right: 1rem;
}
#logo-simple {
height: 1.75rem;
padding: 0 0.25rem;
}
@media (prefers-color-scheme: dark) {
#logo-simple path:last-child {
fill: #e1e1e1;
}
}

View File

@@ -0,0 +1,63 @@
/** DirectoryPage **/
#breadcrumbs>a:hover {
text-decoration: none;
}
#breadcrumbs>a .dir {
padding: 0 0.25em;
}
#breadcrumbs>a:first-child:hover::before,
#breadcrumbs>a .dir:hover {
text-decoration: underline;
}
#breadcrumbs>a:first-child::before {
content: "🏠";
}
#breadcrumbs>a:last-child {
color: #ff0d68;
}
main a {
color: inherit;
font-weight: bold;
}
table.autoindex {
width: 100%;
font-family: monospace;
font-size: 1.25rem;
}
table.autoindex tr {
display: flex;
}
table.autoindex tr:hover {
background-color: #ddd;
}
table.autoindex td {
margin: 0 0.5rem;
}
table.autoindex td:first-child {
flex: 1;
}
table.autoindex td:nth-child(2) {
text-align: right;
}
table.autoindex td:last-child {
text-align: right;
}
@media (prefers-color-scheme: dark) {
table.autoindex tr:hover {
background-color: #222;
}
}

View File

@@ -0,0 +1,105 @@
/** ErrorPage **/
#enduser {
max-width: 30em;
margin: 5em auto 5em auto;
text-align: justify;
/*text-justify: both;*/
}
#enduser a {
color: var(--sanic-blue);
}
#enduser p:last-child {
text-align: right;
}
summary {
margin-top: 3em;
color: var(--sanic-blue);
cursor: pointer;
}
.tracerite {
--tracerite-var: var(--sanic-blue);
--tracerite-val: var(--sanic-text);
--tracerite-type: var(--sanic-green);
--tracerite-exception: var(--sanic);
--tracerite-highlight: var(--sanic-yellow);
--tracerite-tab: var(--sanic-tab-background);
--tracerite-tab-text: var(--sanic-tab-text);
}
.tracerite>h3 {
margin: 0.5rem 0 !important;
}
.tracerite .traceback-labels button {
font-size: 0.8rem !important;
line-height: 120% !important;
background: var(--tracerite-tab) !important;
color: var(--tracerite-tab-text) !important;
transition: 0.3s;
cursor: pointer;
}
.tracerite .traceback-labels {
padding-top: 5px;
}
.tracerite .traceback-labels button:hover {
filter: contrast(150%) brightness(120%) drop-shadow(0 -0 2px var(--sanic-tab-shadow));
}
.tracerite .traceback-details mark span {
background: var(--sanic-highlight-background) !important;
color: var(--sanic-highlight-text) !important;
}
header {
background: var(--sanic-header-background);
}
h1 {
/*margin-left: -1.5rem; !* Emoji partially in the left margin *!*/
}
h2 {
margin: 1em 0 0.2em 0;
font-size: 1.25rem;
color: var(--sanic-text);
}
dl.key-value-table {
width: 100%;
margin: 0;
display: grid;
grid-template-columns: 1fr 5fr;
grid-gap: .3em;
white-space: pre-wrap;
}
dl.key-value-table * {
margin: 0;
}
dl.key-value-table dt {
color: #888;
word-break: break-word;
}
dl.key-value-table dd {
word-break: break-all;
/* Better breaking for cookies header and such */
}
.tracerite .codeline {
font-family:
"Fira Code",
"Source Code Pro",
Menlo,
Monaco,
Consolas,
Lucida Console,
monospace;
}

View File

@@ -146,7 +146,6 @@ class Request:
head: bytes = b"",
stream_id: int = 0,
):
self.raw_url = url_bytes
try:
self._parsed_url = parse_url(url_bytes)

View File

@@ -39,13 +39,13 @@ class Router(BaseRouter):
extra={"host": host} if host else None,
)
except RoutingNotFound as e:
raise NotFound("Requested URL {} not found".format(e.path))
raise NotFound(f"Requested URL {e.path} not found") from None
except NoMethod as e:
raise MethodNotAllowed(
"Method {} not allowed for URL {}".format(method, path),
f"Method {method} not allowed for URL {path}",
method=method,
allowed_methods=e.allowed_methods,
)
) from None
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def get( # type: ignore
@@ -61,6 +61,7 @@ class Router(BaseRouter):
correct response
:rtype: Tuple[ Route, RouteHandler, Dict[str, Any]]
"""
__tracebackhide__ = True
return self._get(path, method, host)
def add( # type: ignore

View File

@@ -94,7 +94,6 @@ def watchdog(sleep_interval, reload_dirs):
try:
while True:
changed = set()
for filename in itertools.chain(
_iter_module_files(),

View File

@@ -130,13 +130,14 @@ def _setup_system_signals(
register_sys_signals: bool,
loop: asyncio.AbstractEventLoop,
) -> None: # no cov
print(">>>>>>>>>>>>>>>>>.", run_multiple)
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"
# Register signals for graceful termination
if register_sys_signals:
if register_sys_signals and False:
if OS_IS_WINDOWS:
ctrlc_workaround_for_windows(app)
else:

View File

@@ -45,7 +45,7 @@ class WebSocketConnection:
await self._send(message)
async def recv(self, *args, **kwargs) -> Optional[str]:
async def recv(self, *args, **kwargs) -> Optional[Union[str, bytes]]:
message = await self._receive()
if message["type"] == "websocket.receive":
@@ -53,7 +53,7 @@ class WebSocketConnection:
return message["text"]
except KeyError:
try:
return message["bytes"].decode()
return message["bytes"]
except KeyError:
raise InvalidUsage("Bad ASGI message received")
elif message["type"] == "websocket.disconnect":

View File

@@ -52,7 +52,6 @@ class WebsocketFrameAssembler:
paused: bool
def __init__(self, protocol) -> None:
self.protocol = protocol
self.read_mutex = asyncio.Lock()

View File

@@ -686,7 +686,6 @@ class WebsocketImplProtocol:
:raises TypeError: for unsupported inputs
"""
async with self.conn_mutex:
if self.ws_proto.state in (CLOSED, CLOSING):
raise WebsocketClosed(
"Cannot write to websocket interface after it is closed."

View File

@@ -2,7 +2,6 @@ from pathlib import Path
from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.response import redirect
def create_simple_server(directory: Path):
@@ -12,10 +11,8 @@ def create_simple_server(directory: Path):
)
app = Sanic("SimpleServer")
app.static("/", directory, name="main")
@app.get("/")
def index(_):
return redirect(app.url_for("main", filename="index.html"))
app.static(
"/", directory, name="main", directory_view=True, index="index.html"
)
return app

View File

@@ -11,7 +11,6 @@ class TouchUpMeta(SanicMeta):
methods = attrs.get("__touchup__")
attrs["__touched__"] = False
if methods:
for method in methods:
if method not in attrs:
raise SanicException(

View File

@@ -75,7 +75,6 @@ def load_module_from_file_location(
location = location.decode(encoding)
if isinstance(location, Path) or "/" in location or "$" in location:
if not isinstance(location, Path):
# A) Check if location contains any environment variables
# in format ${some_env_var}.

View File

@@ -35,6 +35,7 @@ def open_local(paths, mode="r", encoding="utf8"):
return codecs.open(path, mode, encoding)
def str_to_bool(val: str) -> bool:
val = val.lower()
if val in {
@@ -55,6 +56,7 @@ def str_to_bool(val: str) -> bool:
else:
raise ValueError(f"Invalid truth value {val}")
with open_local(["sanic", "__version__.py"], encoding="latin1") as fp:
try:
version = re.findall(
@@ -79,7 +81,7 @@ setup_kwargs = {
),
"long_description": long_description,
"packages": find_packages(exclude=("tests", "tests.*")),
"package_data": {"sanic": ["py.typed"]},
"package_data": {"sanic": ["py.typed", "pages/styles/*"]},
"platforms": "any",
"python_requires": ">=3.7",
"classifiers": [
@@ -109,6 +111,8 @@ requirements = [
"aiofiles>=0.6.0",
"websockets>=10.0",
"multidict>=5.0,<7.0",
"html5tagger>=1.2.1",
"tracerite>=1.0.0",
]
tests_require = [

View File

@@ -1,5 +1,7 @@
import asyncio
import inspect
import logging
import os
import random
import re
import string
@@ -58,7 +60,6 @@ CACHE: Dict[str, Any] = {}
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = HTTP_METHODS
ROUTE_PARAM_TYPES = ["str", "int", "float", "alpha", "uuid"]
@@ -232,3 +233,12 @@ def urlopen():
urlopen.read = Mock()
with patch("sanic.cli.inspector_client.urlopen", urlopen):
yield urlopen
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory

View File

@@ -36,6 +36,7 @@ def test_app_loop_running(app: Sanic):
assert response.text == "pass"
@pytest.mark.asyncio
def test_create_asyncio_server(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
@@ -44,6 +45,7 @@ def test_create_asyncio_server(app: Sanic):
assert srv.is_serving() is True
@pytest.mark.asyncio
def test_asyncio_server_no_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@@ -55,6 +57,7 @@ def test_asyncio_server_no_start_serving(app: Sanic):
assert srv.is_serving() is False
@pytest.mark.asyncio
def test_asyncio_server_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@@ -72,6 +75,7 @@ def test_asyncio_server_start_serving(app: Sanic):
# Looks like we can't easily test `serve_forever()`
@pytest.mark.asyncio
def test_create_server_main(app: Sanic, caplog):
app.listener("main_process_start")(lambda *_: ...)
loop = asyncio.get_event_loop()
@@ -86,6 +90,7 @@ def test_create_server_main(app: Sanic, caplog):
) in caplog.record_tuples
@pytest.mark.asyncio
def test_create_server_no_startup(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@@ -101,6 +106,7 @@ def test_create_server_no_startup(app: Sanic):
loop.run_until_complete(srv.start_serving())
@pytest.mark.asyncio
def test_create_server_main_convenience(app: Sanic, caplog):
app.main_process_start(lambda *_: ...)
loop = asyncio.get_event_loop()
@@ -126,7 +132,6 @@ def test_app_loop_not_running(app: Sanic):
def test_app_run_raise_type_error(app: Sanic):
with pytest.raises(TypeError) as excinfo:
app.run(loop="loop")
@@ -139,7 +144,6 @@ def test_app_run_raise_type_error(app: Sanic):
def test_app_route_raise_value_error(app: Sanic):
with pytest.raises(ValueError) as excinfo:
@app.route("/test")
@@ -221,7 +225,6 @@ def test_app_websocket_parameters(websocket_protocol_mock, app: Sanic):
def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@@ -241,7 +244,6 @@ def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
def test_handle_request_with_nested_exception_debug(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@@ -470,6 +472,7 @@ def test_uvloop_config(app: Sanic, monkeypatch, use):
try_use_uvloop.assert_not_called()
@pytest.mark.asyncio
def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
apps = (Sanic("default-uvloop"), Sanic("no-uvloop"), Sanic("yes-uvloop"))
@@ -506,6 +509,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
assert counter[(logging.WARNING, message)] == modified
@pytest.mark.asyncio
def test_multiple_uvloop_configs_display_warning(caplog):
Sanic._uvloop_setting = None # Reset the setting (changed in prev tests)

View File

@@ -342,7 +342,7 @@ async def test_websocket_send(send, receive, message_stack):
@pytest.mark.asyncio
async def test_websocket_receive(send, receive, message_stack):
async def test_websocket_text_receive(send, receive, message_stack):
msg = {"text": "hello", "type": "websocket.receive"}
message_stack.append(msg)
@@ -351,6 +351,15 @@ async def test_websocket_receive(send, receive, message_stack):
assert text == msg["text"]
@pytest.mark.asyncio
async def test_websocket_bytes_receive(send, receive, message_stack):
msg = {"bytes": b"hello", "type": "websocket.receive"}
message_stack.append(msg)
ws = WebSocketConnection(send, receive)
data = await ws.receive()
assert data == msg["bytes"]
@pytest.mark.asyncio
async def test_websocket_accept_with_no_subprotocols(

View File

@@ -148,7 +148,6 @@ def test_cookie_set_unknown_property():
def test_cookie_set_same_key(app):
cookies = {"test": "wait"}
@app.get("/")

View File

@@ -62,7 +62,6 @@ def exception_handler_app():
@exception_handler_app.route("/8", error_format="html")
def handler_8(request):
raise ErrorWithRequestCtx("OK")
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
@@ -214,7 +213,7 @@ def test_error_handler_noisy_log(
exception_handler_app: Sanic, monkeypatch: MonkeyPatch
):
err_logger = Mock()
monkeypatch.setattr(handlers, "error_logger", err_logger)
monkeypatch.setattr(handlers.error, "error_logger", err_logger)
exception_handler_app.config["NOISY_EXCEPTIONS"] = False
exception_handler_app.test_client.get("/1")

View File

@@ -514,7 +514,6 @@ def test_file_stream_head_response(
def test_file_stream_response_range(
app: Sanic, file_name, static_file_directory, size, start, end
):
Range = namedtuple("Range", ["size", "start", "end", "total"])
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)

View File

@@ -722,7 +722,6 @@ def test_add_webscoket_route_with_version(app):
def test_route_duplicate(app):
with pytest.raises(RouteExists):
@app.route("/test")
@@ -819,7 +818,6 @@ def test_unquote_add_route(app, unquote):
def test_dynamic_add_route(app):
results = []
async def handler(request, name):
@@ -834,7 +832,6 @@ def test_dynamic_add_route(app):
def test_dynamic_add_route_string(app):
results = []
async def handler(request, name):
@@ -938,7 +935,6 @@ def test_dynamic_add_route_unhashable(app):
def test_add_route_duplicate(app):
with pytest.raises(RouteExists):
async def handler1(request):
@@ -1120,7 +1116,6 @@ def test_route_raise_ParameterNameConflicts(app):
def test_route_invalid_host(app):
host = 321
with pytest.raises(ValueError) as excinfo:

View File

@@ -93,6 +93,7 @@ def test_dont_register_system_signals(app):
@pytest.mark.skipif(os.name == "nt", reason="windows cannot SIGINT processes")
def test_windows_workaround():
"""Test Windows workaround (on any other OS)"""
# At least some code coverage, even though this test doesn't work on
# Windows...
class MockApp:

View File

@@ -1,4 +1,3 @@
import inspect
import logging
import os
import sys
@@ -13,15 +12,6 @@ from sanic import Sanic, text
from sanic.exceptions import FileNotFound
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory
@pytest.fixture(scope="module")
def double_dotted_directory_file(static_file_directory: str):
"""Generate double dotted directory and its files"""
@@ -118,7 +108,12 @@ def test_static_file_pathlib(app, static_file_directory, file_name):
def test_static_file_bytes(app, static_file_directory, file_name):
bsep = os.path.sep.encode("utf-8")
file_path = static_file_directory.encode("utf-8") + bsep + file_name
app.static("/testing.file", file_path)
message = (
"Serving a static directory with a bytes "
"string is deprecated and will be removed in v22.9."
)
with pytest.warns(DeprecationWarning, match=message):
app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
@@ -431,7 +426,6 @@ def test_static_stream_large_file(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_use_modified_since(app, static_file_directory, file_name):
file_stat = os.stat(get_file_path(static_file_directory, file_name))
modified_since = strftime(
"%a, %d %b %Y %H:%M:%S GMT", gmtime(file_stat.st_mtime)

View File

@@ -0,0 +1,123 @@
import os
from pathlib import Path
import pytest
from sanic import Sanic
from sanic.handlers.directory import DirectoryHandler
def get_file_path(static_file_directory, file_name):
return os.path.join(static_file_directory, file_name)
def get_file_content(static_file_directory, file_name):
"""The content of the static file to check"""
with open(get_file_path(static_file_directory, file_name), "rb") as file:
return file.read()
def test_static_directory_view(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, directory_view=True)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
def test_static_index_single(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="test.html")
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_index_single_not_found(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="index.html")
_, response = app.test_client.get("/static/")
assert response.status == 404
def test_static_index_multiple(app: Sanic, static_file_directory: str):
app.static(
"/static",
static_file_directory,
index=["index.html", "test.html"],
)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_directory_view_and_index(
app: Sanic, static_file_directory: str
):
app.static(
"/static",
static_file_directory,
directory_view=True,
index="foo.txt",
)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler(app: Sanic, static_file_directory: str):
dh = DirectoryHandler(
"/static",
Path(static_file_directory),
directory_view=True,
index="foo.txt",
)
app.static("/static", static_file_directory, directory_handler=dh)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler_fails(app: Sanic):
dh = DirectoryHandler(
"/static",
Path(""),
directory_view=True,
index="foo.txt",
)
message = (
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, directory_view=True)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, index="index.html")

View File

@@ -654,7 +654,6 @@ def test_sanic_ssl_context_create():
reason="This test requires fork context",
)
def test_ssl_in_multiprocess_mode(app: Sanic, caplog):
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
event = Event()

View File

@@ -176,7 +176,6 @@ def handler(request: Request):
async def client(app: Sanic, loop: AbstractEventLoop):
try:
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
async with httpx.AsyncClient(transport=transport) as client:
r = await client.get("http://myhost.invalid/")

View File

@@ -83,7 +83,6 @@ def test_simple_url_for_getting_with_more_params(app, args, url):
def test_url_for_with_server_name(app):
server_name = f"{test_host}:{test_port}"
app.config.update({"SERVER_NAME": server_name})
path = "/myurl"

View File

@@ -38,7 +38,6 @@ def test_load_module_from_file_location_with_non_existing_env_variable():
LoadFileException,
match="The following environment variables are not set: MuuMilk",
):
load_module_from_file_location("${MuuMilk}")