Compare commits

..

12 Commits

Author SHA1 Message Date
Adam Hopkins
c695c5250a squash 2023-03-09 10:37:41 +02:00
Adam Hopkins
23c1eaab29 POC for compatibility mode 2023-03-09 09:54:32 +02:00
L. Kärkkäinen
a5d7d03413 Nicer traceback formatting (#2667)
Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
Co-authored-by: SML <smlbiobot@gmail.com>
2023-03-06 21:24:12 +02:00
L. Kärkkäinen
259e458847 Simplified parse_content_header escaping (#2707) 2023-03-06 06:39:16 +02:00
Adam Hopkins
cb49c2b26d Add header accessors (#2696) 2023-02-28 00:26:53 +02:00
L. Kärkkäinen
dfc0704831 Error page rendering format selection (#2668)
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
2023-02-26 08:25:10 +02:00
Adam Hopkins
d238995f1b Refresh Request.accept functionality (#2687) 2023-02-21 08:22:51 +02:00
Mohammad Almoghrabi
6f5303e080 check the status of socket before shutting down (#2680)
* check the status of socket before shutting down

* remove socket status checking & ignore OSError exception
2023-02-14 22:59:41 +02:00
Ryu Juheon
5e7f6998bd fix(websocket): ASGI websocket must pass thru bytes as is (#2651) 2023-02-05 16:41:54 +02:00
L. Kärkkäinen
c7a71cd00c Use FALLBACK_ERROR_FORMAT for handlers that return empty() (#2659)
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <adam@amhopkins.com>
2023-02-05 15:29:01 +02:00
Adam Hopkins
9cb9e88678 Establish basic file browser and index fallback (#2662)
Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
Co-authored-by: L. Karkkainen <tronic@users.noreply.github.com>
2023-02-05 15:09:04 +02:00
Rodolfo Olivieri
30c53b6857 Remove deprecated property in blueprint (#2666)
Fixes https://github.com/sanic-org/sanic/issues/2442
2023-01-30 09:26:55 +02:00
59 changed files with 3258 additions and 2183 deletions

View File

@@ -9,6 +9,7 @@ omit =
sanic/simple.py
sanic/utils.py
sanic/cli
sanic/pages
[html]
directory = coverage

View File

@@ -17,7 +17,8 @@ ignore:
- "sanic/compat.py"
- "sanic/simple.py"
- "sanic/utils.py"
- "sanic/cli"
- "sanic/cli/"
- "sanic/pages/"
- ".github/"
- "changelogs/"
- "docker/"

View File

@@ -23,5 +23,7 @@ module = [
"trustme.*",
"sanic_routing.*",
"aioquic.*",
"html5tagger.*",
"tracerite.*",
]
ignore_missing_imports = true

View File

@@ -1 +1,10 @@
__version__ = "22.12.0"
__version__ = "23.3.0"
__compatibility__ = "22.12"
from inspect import currentframe, stack
for frame_info in stack():
if frame_info.frame is not currentframe():
value = frame_info.frame.f_globals.get("__SANIC_COMPATIBILITY__")
if value:
__compatibility__ = value

View File

@@ -72,6 +72,7 @@ from sanic.log import (
from sanic.middleware import Middleware, MiddlewareLocation
from sanic.mixins.listeners import ListenerEvent
from sanic.mixins.startup import StartupMixin
from sanic.mixins.static import StaticHandleMixin
from sanic.models.futures import (
FutureException,
FutureListener,
@@ -79,7 +80,6 @@ from sanic.models.futures import (
FutureRegistry,
FutureRoute,
FutureSignal,
FutureStatic,
)
from sanic.models.handler_types import ListenerType, MiddlewareType
from sanic.models.handler_types import Sanic as SanicVar
@@ -106,7 +106,7 @@ if OS_IS_WINDOWS: # no cov
enable_windows_color_support()
class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"""
The main application instance
"""
@@ -441,9 +441,6 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
return routes
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _apply_middleware(
self,
middleware: FutureMiddleware,
@@ -878,6 +875,8 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
:param request: HTTP Request object
:return: Nothing
"""
__tracebackhide__ = True
await self.dispatch(
"http.lifecycle.handle",
inline=True,
@@ -890,6 +889,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
Union[
BaseHTTPResponse,
Coroutine[Any, Any, Optional[BaseHTTPResponse]],
ResponseStream,
]
] = None
run_middleware = True
@@ -998,7 +998,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
...
await response.send(end_stream=True)
elif isinstance(response, ResponseStream):
resp = await response(request) # type: ignore
resp = await response(request)
await self.dispatch(
"http.lifecycle.response",
inline=True,
@@ -1007,7 +1007,7 @@ class Sanic(BaseSanic, StartupMixin, metaclass=TouchUpMeta):
"response": resp,
},
)
await response.eof() # type: ignore
await response.eof()
else:
if not hasattr(handler, "is_websocket"):
raise ServerError(

View File

@@ -40,6 +40,8 @@ FULL_COLOR_LOGO = """
""" # noqa
SVG_LOGO_SIMPLE = """<svg id=logo-simple viewBox="0 0 964 279"><desc>Sanic</desc><path d="M107 222c9-2 10-20 1-22s-20-2-30-2-17 7-16 14 6 10 15 10h30zm115-1c16-2 30-11 35-23s6-24 2-33-6-14-15-20-24-11-38-10c-7 3-10 13-5 19s17-1 24 4 15 14 13 24-5 15-14 18-50 0-74 0h-17c-6 4-10 15-4 20s16 2 23 3zM251 83q9-1 9-7 0-15-10-16h-13c-10 6-10 20 0 22zM147 60c-4 0-10 3-11 11s5 13 10 12 42 0 67 0c5-3 7-10 6-15s-4-8-9-8zm-33 1c-8 0-16 0-24 3s-20 10-25 20-6 24-4 36 15 22 26 27 78 8 94 3c4-4 4-12 0-18s-69 8-93-10c-8-7-9-23 0-30s12-10 20-10 12 2 16-3 1-15-5-18z" fill="#ff0d68"/><path d="M676 74c0-14-18-9-20 0s0 30 0 39 20 9 20 2zm-297-10c-12 2-15 12-23 23l-41 58H340l22-30c8-12 23-13 30-4s20 24 24 38-10 10-17 10l-68 2q-17 1-48 30c-7 6-10 20 0 24s15-8 20-13 20 -20 58-21h50 c20 2 33 9 52 30 8 10 24-4 16-13L384 65q-3-2-5-1zm131 0c-10 1-12 12-11 20v96c1 10-3 23 5 32s20-5 17-15c0-23-3-46 2-67 5-12 22-14 32-5l103 87c7 5 19 1 18-9v-64c-3-10-20-9-21 2s-20 22-30 13l-97-80c-5-4-10-10-18-10zM701 76v128c2 10 15 12 20 4s0-102 0-124s-20-18-20-7z M850 63c-35 0-69-2-86 15s-20 60-13 66 13 8 16 0 1-10 1-27 12-26 20-32 66-5 85-5 31 4 31-10-18-7-54-7M764 159c-6-2-15-2-16 12s19 37 33 43 23 8 25-4-4-11-11-14q-9-3-22-18c-4-7-3-16-10-19zM828 196c-4 0-8 1-10 5s-4 12 0 15 8 2 12 2h60c5 0 10-2 12-6 3-7-1-16-8-16z" fill="#1f1f1f"/></svg>""" # noqa
ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

View File

@@ -9,6 +9,7 @@ from sanic.mixins.listeners import ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
from sanic.mixins.signals import SignalMixin
from sanic.mixins.static import StaticMixin
VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
@@ -16,6 +17,7 @@ VALID_NAME = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_\-]*$")
class BaseSanic(
RouteMixin,
StaticMixin,
MiddlewareMixin,
ListenerMixin,
ExceptionMixin,

View File

@@ -304,9 +304,6 @@ class Blueprint(BaseSanic):
# Routes
for future in self._future_routes:
# attach the blueprint name to the handler so that it can be
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available
uri = self._setup_uri(future.uri, url_prefix)

View File

@@ -0,0 +1,18 @@
from sanic.__version__ import __compatibility__
if __compatibility__ == "22.12":
from .v22_12.request import (
File,
Request,
RequestParameters,
parse_multipart_form,
)
elif __compatibility__ == "23.3":
from .v23_3.request import (
File,
Request,
RequestParameters,
parse_multipart_form,
)
else:
raise RuntimeError(f"Unknown compatibility value: {__compatibility__}")

View File

File diff suppressed because it is too large Load Diff

View File

View File

@@ -0,0 +1,9 @@
from ..v22_12.request import File
from ..v22_12.request import Request as LegacyRequest
from ..v22_12.request import RequestParameters, parse_multipart_form
class Request(LegacyRequest):
@property
def something_new(self):
return 123

View File

@@ -88,6 +88,12 @@ class Header(CIMultiDict):
very similar to a regular dictionary.
"""
def __getattr__(self, key: str) -> str:
if key.startswith("_"):
return self.__getattribute__(key)
key = key.rstrip("_").replace("_", "-")
return ",".join(self.getall(key, default=[]))
def get_all(self, key: str):
"""
Convenience method mapped to ``getall()``.

View File

@@ -12,6 +12,7 @@ Setting ``app.config.FALLBACK_ERROR_FORMAT = "auto"`` will enable a switch that
will attempt to provide an appropriate response format based upon the
request type.
"""
from __future__ import annotations
import sys
import typing as t
@@ -21,8 +22,9 @@ from traceback import extract_tb
from sanic.exceptions import BadRequest, SanicException
from sanic.helpers import STATUS_CODES
from sanic.request import Request
from sanic.response import HTTPResponse, html, json, text
from sanic.log import deprecation, logger
from sanic.pages.error import ErrorPage
from sanic.response import html, json, text
dumps: t.Callable[..., str]
@@ -33,13 +35,15 @@ try:
except ImportError: # noqa
from json import dumps
if t.TYPE_CHECKING:
from sanic import HTTPResponse, Request
DEFAULT_FORMAT = "auto"
FALLBACK_TEXT = (
"The server encountered an internal error and "
"cannot complete your request."
)
FALLBACK_TEXT = """\
The application encountered an unexpected error and could not continue.\
"""
FALLBACK_STATUS = 500
JSON = "application/json"
class BaseRenderer:
@@ -113,134 +117,18 @@ class HTMLRenderer(BaseRenderer):
The default fallback type.
"""
TRACEBACK_STYLE = """
html { font-family: sans-serif }
h2 { color: #888; }
.tb-wrapper p, dl, dd { margin: 0 }
.frame-border { margin: 1rem }
.frame-line > *, dt, dd { padding: 0.3rem 0.6rem }
.frame-line, dl { margin-bottom: 0.3rem }
.frame-code, dd { font-size: 16px; padding-left: 4ch }
.tb-wrapper, dl { border: 1px solid #eee }
.tb-header,.obj-header {
background: #eee; padding: 0.3rem; font-weight: bold
}
.frame-descriptor, dt { background: #e2eafb; font-size: 14px }
"""
TRACEBACK_WRAPPER_HTML = (
"<div class=tb-header>{exc_name}: {exc_value}</div>"
"<div class=tb-wrapper>{frame_html}</div>"
)
TRACEBACK_BORDER = (
"<div class=frame-border>"
"The above exception was the direct cause of the following exception:"
"</div>"
)
TRACEBACK_LINE_HTML = (
"<div class=frame-line>"
"<p class=frame-descriptor>"
"File {0.filename}, line <i>{0.lineno}</i>, "
"in <code><b>{0.name}</b></code>"
"<p class=frame-code><code>{0.line}</code>"
"</div>"
)
OBJECT_WRAPPER_HTML = (
"<div class=obj-header>{title}</div>"
"<dl class={obj_type}>{display_html}</dl>"
)
OBJECT_DISPLAY_HTML = "<dt>{key}</dt><dd><code>{value}</code></dd>"
OUTPUT_HTML = (
"<!DOCTYPE html><html lang=en>"
"<meta charset=UTF-8><title>{title}</title>\n"
"<style>{style}</style>\n"
"<h1>{title}</h1><p>{text}\n"
"{body}"
)
def full(self) -> HTTPResponse:
return html(
self.OUTPUT_HTML.format(
title=self.title,
text=self.text,
style=self.TRACEBACK_STYLE,
body=self._generate_body(full=True),
),
status=self.status,
page = ErrorPage(
debug=self.debug,
title=super().title,
text=super().text,
request=self.request,
exc=self.exception,
)
return html(page.render(), status=self.status, headers=self.headers)
def minimal(self) -> HTTPResponse:
return html(
self.OUTPUT_HTML.format(
title=self.title,
text=self.text,
style=self.TRACEBACK_STYLE,
body=self._generate_body(full=False),
),
status=self.status,
headers=self.headers,
)
@property
def text(self):
return escape(super().text)
@property
def title(self):
return escape(f"⚠️ {super().title}")
def _generate_body(self, *, full):
lines = []
if full:
_, exc_value, __ = sys.exc_info()
exceptions = []
while exc_value:
exceptions.append(self._format_exc(exc_value))
exc_value = exc_value.__cause__
traceback_html = self.TRACEBACK_BORDER.join(reversed(exceptions))
appname = escape(self.request.app.name)
name = escape(self.exception.__class__.__name__)
value = escape(self.exception)
path = escape(self.request.path)
lines += [
f"<h2>Traceback of {appname} " "(most recent call last):</h2>",
f"{traceback_html}",
"<div class=summary><p>",
f"<b>{name}: {value}</b> "
f"while handling path <code>{path}</code>",
"</div>",
]
for attr, display in (("context", True), ("extra", bool(full))):
info = getattr(self.exception, attr, None)
if info and display:
lines.append(self._generate_object_display(info, attr))
return "\n".join(lines)
def _generate_object_display(
self, obj: t.Dict[str, t.Any], descriptor: str
) -> str:
display = "".join(
self.OBJECT_DISPLAY_HTML.format(key=key, value=value)
for key, value in obj.items()
)
return self.OBJECT_WRAPPER_HTML.format(
title=descriptor.title(),
display_html=display,
obj_type=descriptor.lower(),
)
def _format_exc(self, exc):
frames = extract_tb(exc.__traceback__)
frame_html = "".join(
self.TRACEBACK_LINE_HTML.format(frame) for frame in frames
)
return self.TRACEBACK_WRAPPER_HTML.format(
exc_name=escape(exc.__class__.__name__),
exc_value=escape(exc),
frame_html=frame_html,
)
return self.full()
class TextRenderer(BaseRenderer):
@@ -388,32 +276,26 @@ def escape(text):
return f"{text}".replace("&", "&amp;").replace("<", "&lt;")
RENDERERS_BY_CONFIG = {
"html": HTMLRenderer,
"json": JSONRenderer,
"text": TextRenderer,
MIME_BY_CONFIG = {
"text": "text/plain",
"json": "application/json",
"html": "text/html",
}
CONFIG_BY_MIME = {v: k for k, v in MIME_BY_CONFIG.items()}
RENDERERS_BY_CONTENT_TYPE = {
"text/plain": TextRenderer,
"application/json": JSONRenderer,
"multipart/form-data": HTMLRenderer,
"text/html": HTMLRenderer,
}
CONTENT_TYPE_BY_RENDERERS = {
v: k for k, v in RENDERERS_BY_CONTENT_TYPE.items()
}
# Handler source code is checked for which response types it returns with the
# route error_format="auto" (default) to determine which format to use.
RESPONSE_MAPPING = {
"empty": "html",
"json": "json",
"text": "text",
"raw": "text",
"html": "html",
"file": "html",
"file_stream": "text",
"stream": "text",
"redirect": "html",
"JSONResponse": "json",
"text/plain": "text",
"text/html": "html",
"application/json": "json",
@@ -421,7 +303,7 @@ RESPONSE_MAPPING = {
def check_error_format(format):
if format not in RENDERERS_BY_CONFIG and format != "auto":
if format not in MIME_BY_CONFIG and format != "auto":
raise SanicException(f"Unknown format: {format}")
@@ -436,98 +318,68 @@ def exception_response(
"""
Render a response for the default FALLBACK exception handler.
"""
content_type = None
if not renderer:
# Make sure we have something set
renderer = base
render_format = fallback
if request:
# If there is a request, try and get the format
# from the route
if request.route:
try:
if request.route.extra.error_format:
render_format = request.route.extra.error_format
except AttributeError:
...
content_type = request.headers.getone("content-type", "").split(
";"
)[0]
acceptable = request.accept
# If the format is auto still, make a guess
if render_format == "auto":
# First, if there is an Accept header, check if text/html
# is the first option
# According to MDN Web Docs, all major browsers use text/html
# as the primary value in Accept (with the exception of IE 8,
# and, well, if you are supporting IE 8, then you have bigger
# problems to concern yourself with than what default exception
# renderer is used)
# Source:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Content_negotiation/List_of_default_Accept_values
if acceptable and acceptable[0].match(
"text/html",
allow_type_wildcard=False,
allow_subtype_wildcard=False,
):
renderer = HTMLRenderer
# Second, if there is an Accept header, check if
# application/json is an option, or if the content-type
# is application/json
elif (
acceptable
and acceptable.match(
"application/json",
allow_type_wildcard=False,
allow_subtype_wildcard=False,
)
or content_type == "application/json"
):
renderer = JSONRenderer
# Third, if there is no Accept header, assume we want text.
# The likely use case here is a raw socket.
elif not acceptable:
renderer = TextRenderer
else:
# Fourth, look to see if there was a JSON body
# When in this situation, the request is probably coming
# from curl, an API client like Postman or Insomnia, or a
# package like requests or httpx
try:
# Give them the benefit of the doubt if they did:
# $ curl localhost:8000 -d '{"foo": "bar"}'
# And provide them with JSONRenderer
renderer = JSONRenderer if request.json else base
except BadRequest:
renderer = base
else:
renderer = RENDERERS_BY_CONFIG.get(render_format, renderer)
# Lastly, if there is an Accept header, make sure
# our choice is okay
if acceptable:
type_ = CONTENT_TYPE_BY_RENDERERS.get(renderer) # type: ignore
if type_ and type_ not in acceptable:
# If the renderer selected is not in the Accept header
# look through what is in the Accept header, and select
# the first option that matches. Otherwise, just drop back
# to the original default
for accept in acceptable:
mtype = f"{accept.type_}/{accept.subtype}"
maybe = RENDERERS_BY_CONTENT_TYPE.get(mtype)
if maybe:
renderer = maybe
break
else:
renderer = base
mt = guess_mime(request, fallback)
renderer = RENDERERS_BY_CONTENT_TYPE.get(mt, base)
renderer = t.cast(t.Type[BaseRenderer], renderer)
return renderer(request, exception, debug).render()
def guess_mime(req: Request, fallback: str) -> str:
# Attempt to find a suitable MIME format for the response.
# Insertion-ordered map of formats["html"] = "source of that suggestion"
formats = {}
name = ""
# Route error_format (by magic from handler code if auto, the default)
if req.route:
name = req.route.name
f = req.route.extra.error_format
if f in MIME_BY_CONFIG:
formats[f] = name
if not formats and fallback in MIME_BY_CONFIG:
formats[fallback] = "FALLBACK_ERROR_FORMAT"
# If still not known, check for the request for clues of JSON
if not formats and fallback == "auto" and req.accept.match(JSON):
if JSON in req.accept: # Literally, not wildcard
formats["json"] = "request.accept"
elif JSON in req.headers.getone("content-type", ""):
formats["json"] = "content-type"
# DEPRECATION: Remove this block in 24.3
else:
c = None
try:
c = req.json
except BadRequest:
pass
if c:
formats["json"] = "request.json"
deprecation(
"Response type was determined by the JSON content of "
"the request. This behavior is deprecated and will be "
"removed in v24.3. Please specify the format either by\n"
f' error_format="json" on route {name}, by\n'
' FALLBACK_ERROR_FORMAT = "json", or by adding header\n'
" accept: application/json to your requests.",
24.3,
)
# Any other supported formats
if fallback == "auto":
for k in MIME_BY_CONFIG:
if k not in formats:
formats[k] = "any"
mimes = [MIME_BY_CONFIG[k] for k in formats]
m = req.accept.match(*mimes)
if m:
format = CONFIG_BY_MIME[m.mime]
source = formats[format]
logger.debug(
f"The client accepts {m.header}, using '{format}' from {source}"
)
else:
logger.debug(f"No format found, the client accepts {req.accept!r}")
return m.mime

View File

@@ -0,0 +1,10 @@
from .content_range import ContentRangeHandler
from .directory import DirectoryHandler
from .error import ErrorHandler
__all__ = (
"ContentRangeHandler",
"DirectoryHandler",
"ErrorHandler",
)

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
from datetime import datetime
from operator import itemgetter
from pathlib import Path
from stat import S_ISDIR
from typing import Dict, Iterable, Optional, Sequence, Union, cast
from sanic.exceptions import NotFound
from sanic.pages.directory_page import DirectoryPage, FileInfo
from sanic.request import Request
from sanic.response import file, html, redirect
class DirectoryHandler:
def __init__(
self,
uri: str,
directory: Path,
directory_view: bool = False,
index: Optional[Union[str, Sequence[str]]] = None,
) -> None:
if isinstance(index, str):
index = [index]
elif index is None:
index = []
self.base = uri.strip("/")
self.directory = directory
self.directory_view = directory_view
self.index = tuple(index)
async def handle(self, request: Request, path: str):
current = path.strip("/")[len(self.base) :].strip("/") # noqa: E203
for file_name in self.index:
index_file = self.directory / current / file_name
if index_file.is_file():
return await file(index_file)
if self.directory_view:
return self._index(
self.directory / current, path, request.app.debug
)
if self.index:
raise NotFound("File not found")
raise IsADirectoryError(f"{self.directory.as_posix()} is a directory")
def _index(self, location: Path, path: str, debug: bool):
# Remove empty path elements, append slash
if "//" in path or not path.endswith("/"):
return redirect(
"/" + "".join([f"{p}/" for p in path.split("/") if p])
)
# Render file browser
page = DirectoryPage(self._iter_files(location), path, debug)
return html(page.render())
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]:
stat = path.stat()
modified = (
datetime.fromtimestamp(stat.st_mtime)
.isoformat()[:19]
.replace("T", " ")
)
is_dir = S_ISDIR(stat.st_mode)
icon = "📁" if is_dir else "📄"
file_name = path.name
if is_dir:
file_name += "/"
return {
"priority": is_dir * -1,
"file_name": file_name,
"icon": icon,
"file_access": modified,
"file_size": stat.st_size,
}
def _iter_files(self, location: Path) -> Iterable[FileInfo]:
prepared = [self._prepare_file(f) for f in location.iterdir()]
for item in sorted(prepared, key=itemgetter("priority", "file_name")):
del item["priority"]
yield cast(FileInfo, item)

View File

@@ -3,11 +3,6 @@ from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Type
from sanic.errorpages import BaseRenderer, TextRenderer, exception_response
from sanic.exceptions import (
HeaderNotFound,
InvalidRangeType,
RangeNotSatisfiable,
)
from sanic.log import deprecation, error_logger
from sanic.models.handler_types import RouteHandler
from sanic.response import text
@@ -23,7 +18,6 @@ class ErrorHandler:
by the developers to perform a wide range of tasks from recording the error
stats to reporting them to an external service that can be used for
realtime alerting system.
"""
def __init__(
@@ -196,74 +190,3 @@ class ErrorHandler:
error_logger.exception(
"Exception occurred while handling uri: %s", url
)
class ContentRangeHandler:
"""
A mechanism to parse and process the incoming request headers to
extract the content range information.
:param request: Incoming api request
:param stats: Stats related to the content
:type request: :class:`sanic.request.Request`
:type stats: :class:`posix.stat_result`
:ivar start: Content Range start
:ivar end: Content Range end
:ivar size: Length of the content
:ivar total: Total size identified by the :class:`posix.stat_result`
instance
:ivar ContentRangeHandler.headers: Content range header ``dict``
"""
__slots__ = ("start", "end", "size", "total", "headers")
def __init__(self, request, stats):
self.total = stats.st_size
_range = request.headers.getone("range", None)
if _range is None:
raise HeaderNotFound("Range Header Not Found")
unit, _, value = tuple(map(str.strip, _range.partition("=")))
if unit != "bytes":
raise InvalidRangeType(
"%s is not a valid Range Type" % (unit,), self
)
start_b, _, end_b = tuple(map(str.strip, value.partition("-")))
try:
self.start = int(start_b) if start_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (start_b,), self
)
try:
self.end = int(end_b) if end_b else None
except ValueError:
raise RangeNotSatisfiable(
"'%s' is invalid for Content Range" % (end_b,), self
)
if self.end is None:
if self.start is None:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total - 1
if self.start >= self.end:
raise RangeNotSatisfiable(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)
}
def __bool__(self):
return self.size > 0

View File

@@ -19,7 +19,6 @@ OptionsIterable = Iterable[Tuple[str, str]] # May contain duplicate keys
_token, _quoted = r"([\w!#$%&'*+\-.^_`|~]+)", r'"([^"]*)"'
_param = re.compile(rf";\s*{_token}=(?:{_token}|{_quoted})", re.ASCII)
_firefox_quote_escape = re.compile(r'\\"(?!; |\s*$)')
_ipv6 = "(?:[0-9A-Fa-f]{0,4}:){2,7}[0-9A-Fa-f]{0,4}"
_ipv6_re = re.compile(_ipv6)
_host_re = re.compile(
@@ -33,143 +32,96 @@ _host_re = re.compile(
# For more information, consult ../tests/test_requests.py
def parse_arg_as_accept(f):
def func(self, other, *args, **kwargs):
if not isinstance(other, Accept) and other:
other = Accept.parse(other)
return f(self, other, *args, **kwargs)
return func
class MediaType(str):
def __new__(cls, value: str):
return str.__new__(cls, value)
def __init__(self, value: str) -> None:
self.value = value
self.is_wildcard = self.check_if_wildcard(value)
def __eq__(self, other):
if self.is_wildcard:
return True
if self.match(other):
return True
other_is_wildcard = (
other.is_wildcard
if isinstance(other, MediaType)
else self.check_if_wildcard(other)
)
return other_is_wildcard
def match(self, other):
other_value = other.value if isinstance(other, MediaType) else other
return self.value == other_value
@staticmethod
def check_if_wildcard(value):
return value == "*"
class Accept(str):
def __new__(cls, value: str, *args, **kwargs):
return str.__new__(cls, value)
class MediaType:
"""A media type, as used in the Accept header."""
def __init__(
self,
value: str,
type_: MediaType,
subtype: MediaType,
*,
q: str = "1.0",
**kwargs: str,
type_: str,
subtype: str,
**params: str,
):
qvalue = float(q)
if qvalue > 1 or qvalue < 0:
raise InvalidHeader(
f"Accept header qvalue must be between 0 and 1, not: {qvalue}"
)
self.value = value
self.type_ = type_
self.type = type_
self.subtype = subtype
self.qvalue = qvalue
self.params = kwargs
self.q = float(params.get("q", "1.0"))
self.params = params
self.mime = f"{type_}/{subtype}"
self.key = (
-1 * self.q,
-1 * len(self.params),
self.subtype == "*",
self.type == "*",
)
def _compare(self, other, method):
try:
return method(self.qvalue, other.qvalue)
except (AttributeError, TypeError):
return NotImplemented
def __repr__(self):
return self.mime + "".join(f";{k}={v}" for k, v in self.params.items())
@parse_arg_as_accept
def __lt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s < o)
def __eq__(self, other):
"""Check for mime (str or MediaType) identical type/subtype.
Parameters such as q are not considered."""
if isinstance(other, str):
# Give a friendly reminder if str contains parameters
if ";" in other:
raise ValueError("Use match() to compare with parameters")
return self.mime == other
if isinstance(other, MediaType):
# Ignore parameters silently with MediaType objects
return self.mime == other.mime
return NotImplemented
@parse_arg_as_accept
def __le__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s <= o)
@parse_arg_as_accept
def __eq__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s == o)
@parse_arg_as_accept
def __ge__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s >= o)
@parse_arg_as_accept
def __gt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s > o)
@parse_arg_as_accept
def __ne__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s != o)
@parse_arg_as_accept
def match(
self,
other,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
type_match = (
self.type_ == other.type_
if allow_type_wildcard
else (
self.type_.match(other.type_)
and not self.type_.is_wildcard
and not other.type_.is_wildcard
)
mime_with_params: Union[str, MediaType],
) -> Optional[MediaType]:
"""Check if this media type matches the given mime type/subtype.
Wildcards are supported both ways on both type and subtype.
If mime contains a semicolon, optionally followed by parameters,
the parameters of the two media types must match exactly.
Note: Use the `==` operator instead to check for literal matches
without expanding wildcards.
@param media_type: A type/subtype string to match.
@return `self` if the media types are compatible, else `None`
"""
mt = (
MediaType._parse(mime_with_params)
if isinstance(mime_with_params, str)
else mime_with_params
)
subtype_match = (
self.subtype == other.subtype
if allow_subtype_wildcard
else (
self.subtype.match(other.subtype)
and not self.subtype.is_wildcard
and not other.subtype.is_wildcard
return (
self
if (
mt
# All parameters given in the other media type must match
and all(self.params.get(k) == v for k, v in mt.params.items())
# Subtype match
and (
self.subtype == mt.subtype
or self.subtype == "*"
or mt.subtype == "*"
)
# Type match
and (
self.type == mt.type or self.type == "*" or mt.type == "*"
)
)
else None
)
return type_match and subtype_match
@property
def has_wildcard(self) -> bool:
"""Return True if this media type has a wildcard in it."""
return any(part == "*" for part in (self.subtype, self.type))
@classmethod
def parse(cls, raw: str) -> Accept:
invalid = False
mtype = raw.strip()
def _parse(cls, mime_with_params: str) -> Optional[MediaType]:
mtype = mime_with_params.strip()
if "/" not in mime_with_params:
return None
try:
media, *raw_params = mtype.split(";")
type_, subtype = media.split("/")
except ValueError:
invalid = True
if invalid or not type_ or not subtype:
raise InvalidHeader(f"Header contains invalid Accept value: {raw}")
mime, *raw_params = mtype.split(";")
type_, subtype = mime.split("/", 1)
if not type_ or not subtype:
raise ValueError(f"Invalid media type: {mtype}")
params = dict(
[
@@ -178,46 +130,160 @@ class Accept(str):
]
)
return cls(mtype, MediaType(type_), MediaType(subtype), **params)
return cls(type_.lstrip(), subtype.rstrip(), **params)
class AcceptContainer(list):
def __contains__(self, o: object) -> bool:
return any(item.match(o) for item in self)
class Matched:
"""A matching result of a MIME string against a header."""
def match(
self,
o: object,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
return any(
item.match(
o,
allow_type_wildcard=allow_type_wildcard,
allow_subtype_wildcard=allow_subtype_wildcard,
def __init__(self, mime: str, header: Optional[MediaType]):
self.mime = mime
self.header = header
def __repr__(self):
return f"<{self} matched {self.header}>" if self else "<no match>"
def __str__(self):
return self.mime
def __bool__(self):
return self.header is not None
def __eq__(self, other: Any) -> bool:
try:
comp, other_accept = self._compare(other)
except TypeError:
return False
return bool(
comp
and (
(self.header and other_accept.header)
or (not self.header and not other_accept.header)
)
for item in self
)
def _compare(self, other) -> Tuple[bool, Matched]:
if isinstance(other, str):
parsed = Matched.parse(other)
if self.mime == other:
return True, parsed
other = parsed
if isinstance(other, Matched):
return self.header == other.header, other
raise TypeError(
"Comparison not supported between unequal "
f"mime types of '{self.mime}' and '{other}'"
)
def match(self, other: Union[str, Matched]) -> Optional[Matched]:
accept = Matched.parse(other) if isinstance(other, str) else other
if not self.header or not accept.header:
return None
if self.header.match(accept.header):
return accept
return None
@classmethod
def parse(cls, raw: str) -> Matched:
media_type = MediaType._parse(raw)
return cls(raw, media_type)
class AcceptList(list):
"""A list of media types, as used in the Accept header.
The Accept header entries are listed in order of preference, starting
with the most preferred. This class is a list of `MediaType` objects,
that encapsulate also the q value or any other parameters.
Two separate methods are provided for searching the list:
- 'match' for finding the most preferred match (wildcards supported)
- operator 'in' for checking explicit matches (wildcards as literals)
"""
def match(self, *mimes: str, accept_wildcards=True) -> Matched:
"""Find a media type accepted by the client.
This method can be used to find which of the media types requested by
the client is most preferred against the ones given as arguments.
The ordering of preference is set by:
1. The order set by RFC 7231, s. 5.3.2, giving a higher priority
to q values and more specific type definitions,
2. The order of the arguments (first is most preferred), and
3. The first matching entry on the Accept header.
Wildcards are matched both ways. A match is usually found, as the
Accept headers typically include `*/*`, in particular if the header
is missing, is not manually set, or if the client is a browser.
Note: the returned object behaves as a string of the mime argument
that matched, and is empty/falsy if no match was found. The matched
header entry `MediaType` or `None` is available as the `m` attribute.
@param mimes: Any MIME types to search for in order of preference.
@param accept_wildcards: Match Accept entries with wildcards in them.
@return A match object with the mime string and the MediaType object.
"""
a = sorted(
(-acc.q, i, j, mime, acc)
for j, acc in enumerate(self)
if accept_wildcards or not acc.has_wildcard
for i, mime in enumerate(mimes)
if acc.match(mime)
)
return Matched(*(a[0][-2:] if a else ("", None)))
def __str__(self):
"""Format as Accept header value (parsed, not original)."""
return ", ".join(str(m) for m in self)
def parse_accept(accept: Optional[str]) -> AcceptList:
"""Parse an Accept header and order the acceptable media types in
according to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
if not accept:
if accept == "":
return AcceptList() # Empty header, accept nothing
accept = "*/*" # No header means that all types are accepted
try:
a = [
mt
for mt in [MediaType._parse(mtype) for mtype in accept.split(",")]
if mt
]
if not a:
raise ValueError
return AcceptList(sorted(a, key=lambda x: x.key))
except ValueError:
raise InvalidHeader(f"Invalid header value in Accept: {accept}")
def parse_content_header(value: str) -> Tuple[str, Options]:
"""Parse content-type and content-disposition header values.
E.g. 'form-data; name=upload; filename=\"file.txt\"' to
E.g. `form-data; name=upload; filename="file.txt"` to
('form-data', {'name': 'upload', 'filename': 'file.txt'})
Mostly identical to cgi.parse_header and werkzeug.parse_options_header
but runs faster and handles special characters better. Unescapes quotes.
but runs faster and handles special characters better.
Unescapes %22 to `"` and %0D%0A to `\n` in field values.
"""
value = _firefox_quote_escape.sub("%22", value)
pos = value.find(";")
if pos == -1:
options: Dict[str, Union[int, str]] = {}
else:
options = {
m.group(1).lower(): m.group(2) or m.group(3).replace("%22", '"')
m.group(1)
.lower(): (m.group(2) or m.group(3))
.replace("%22", '"')
.replace("%0D%0A", "\n")
for m in _param.finditer(value[pos:])
}
value = value[:pos]
@@ -368,34 +434,6 @@ def format_http1_response(status: int, headers: HeaderBytesIterable) -> bytes:
return ret
def _sort_accept_value(accept: Accept):
return (
accept.qvalue,
len(accept.params),
accept.subtype != "*",
accept.type_ != "*",
)
def parse_accept(accept: str) -> AcceptContainer:
"""Parse an Accept header and order the acceptable media types in
accorsing to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
media_types = accept.split(",")
accept_list: List[Accept] = []
for mtype in media_types:
if not mtype:
continue
accept_list.append(Accept.parse(mtype))
return AcceptContainer(
sorted(accept_list, key=_sort_accept_value, reverse=True)
)
def parse_credentials(
header: Optional[str],
prefixes: Union[List, Tuple, Set] = None,

35
sanic/mixins/base.py Normal file
View File

@@ -0,0 +1,35 @@
from typing import Optional
from sanic.base.meta import SanicMeta
class BaseMixin(metaclass=SanicMeta):
name: str
strict_slashes: Optional[bool]
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name

View File

@@ -1,11 +1,6 @@
from ast import NodeVisitor, Return, parse
from contextlib import suppress
from email.utils import formatdate
from functools import partial, wraps
from inspect import getsource, signature
from mimetypes import guess_type
from os import path
from pathlib import Path, PurePath
from textwrap import dedent
from typing import (
Any,
@@ -19,20 +14,15 @@ from typing import (
Union,
cast,
)
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE, HTTP_METHODS
from sanic.constants import HTTP_METHODS
from sanic.errorpages import RESPONSE_MAPPING
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureRoute, FutureStatic
from sanic.models.handler_types import RouteHandler
from sanic.response import HTTPResponse, file, file_stream, validate_file
from sanic.types import HashableDict
@@ -41,20 +31,14 @@ RouteWrapper = Callable[
]
class RouteMixin(metaclass=SanicMeta):
name: str
class RouteMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_routes: Set[FutureRoute] = set()
self._future_statics: Set[FutureStatic] = set()
self.strict_slashes: Optional[bool] = False
def _apply_route(self, route: FutureRoute) -> List[Route]:
raise NotImplementedError # noqa
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def route(
self,
uri: str,
@@ -688,324 +672,6 @@ class RouteMixin(metaclass=SanicMeta):
**ctx_kwargs,
)(handler)
def static(
self,
uri: str,
file_or_directory: Union[str, bytes, PurePath],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: bool = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[bool] = None,
apply: bool = True,
resource_type: Optional[str] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noqa
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path
async def _static_request_handler(
self,
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
request,
content_type=None,
__file_uri__=None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if type(stream_large_files) == int:
threshold = stream_large_files
else:
threshold = 1024 * 1024
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
f"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
:param app: Sanic
:param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the
server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the file_stream() handler
rather than the file() handler to send the file
If this is an integer, this represents the
threshold size to switch to file_stream()
:param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header
:return: registered static routes
:rtype: List[sanic.router.Route]
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory,
static.use_modified_since,
static.use_content_range,
static.stream_large_files,
content_type=static.content_type,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
def _determine_error_format(self, handler) -> str:
with suppress(OSError, TypeError):
src = dedent(getsource(handler))

View File

@@ -877,7 +877,10 @@ class StartupMixin(metaclass=SanicMeta):
sync_manager.shutdown()
for sock in socks:
sock.shutdown(SHUT_RDWR)
try:
sock.shutdown(SHUT_RDWR)
except OSError:
...
sock.close()
socks = []
trigger_events(main_stop, loop, primary)

348
sanic/mixins/static.py Normal file
View File

@@ -0,0 +1,348 @@
from email.utils import formatdate
from functools import partial, wraps
from mimetypes import guess_type
from os import PathLike, path
from pathlib import Path, PurePath
from typing import Optional, Sequence, Set, Union, cast
from urllib.parse import unquote
from sanic_routing.route import Route
from sanic.base.meta import SanicMeta
from sanic.compat import stat_async
from sanic.constants import DEFAULT_HTTP_CONTENT_TYPE
from sanic.exceptions import FileNotFound, HeaderNotFound, RangeNotSatisfiable
from sanic.handlers import ContentRangeHandler
from sanic.handlers.directory import DirectoryHandler
from sanic.log import deprecation, error_logger
from sanic.mixins.base import BaseMixin
from sanic.models.futures import FutureStatic
from sanic.request import Request
from sanic.response import HTTPResponse, file, file_stream, validate_file
class StaticMixin(BaseMixin, metaclass=SanicMeta):
def __init__(self, *args, **kwargs) -> None:
self._future_statics: Set[FutureStatic] = set()
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def static(
self,
uri: str,
file_or_directory: Union[PathLike, str, bytes],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: Union[bool, int] = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[str] = None,
apply: bool = True,
resource_type: Optional[str] = None,
index: Optional[Union[str, Sequence[str]]] = None,
directory_view: bool = False,
directory_handler: Optional[DirectoryHandler] = None,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:param apply: If true, will register the route immediately
:param resource_type: Explicitly declare a resource to be a "
file" or a "dir"
:param index: When exposing against a directory, index is the name that
will be served as the default file. When multiple files names are
passed, then they will be tried in order.
:param directory_view: Whether to fallback to showing the directory
viewer when exposing a directory
:param directory_handler: An instance of :class:`DirectoryHandler`
that can be used for explicitly controlling and subclassing the
behavior of the default directory handler
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
if isinstance(file_or_directory, bytes):
deprecation(
"Serving a static directory with a bytes string is "
"deprecated and will be removed in v22.9.",
22.9,
)
file_or_directory = cast(str, file_or_directory.decode())
file_or_directory = Path(file_or_directory)
if directory_handler and (directory_view or index):
raise ValueError(
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
if not directory_handler:
directory_handler = DirectoryHandler(
uri=uri,
directory=file_or_directory,
directory_view=directory_view,
index=index,
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
resource_type,
directory_handler,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
class StaticHandleMixin(metaclass=SanicMeta):
def _apply_static(self, static: FutureStatic) -> Route:
return self._register_static(static)
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
raise TypeError(
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory
):
raise TypeError(
"Resource type improperly identified as file. "
f"'{file_or_directory}'"
)
elif static.resource_type != "file":
raise ValueError(
"The resource_type should be set to 'file' or 'dir'"
)
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory=file_or_directory,
use_modified_since=static.use_modified_since,
use_content_range=static.use_content_range,
stream_large_files=static.stream_large_files,
content_type=static.content_type,
directory_handler=static.directory_handler,
)
)
route, _ = self.route( # type: ignore
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route
async def _static_request_handler(
self,
request: Request,
*,
file_or_directory: PathLike,
use_modified_since: bool,
use_content_range: bool,
stream_large_files: Union[bool, int],
directory_handler: DirectoryHandler,
content_type: Optional[str] = None,
__file_uri__: Optional[str] = None,
):
not_found = FileNotFound(
"File not found",
path=file_or_directory,
relative_url=__file_uri__,
)
# Merge served directory and requested file if provided
file_path = await self._get_file_path(
file_or_directory, __file_uri__, not_found
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = stats.st_mtime
response = await validate_file(request.headers, modified_since)
if response:
return response
headers["Last-Modified"] = formatdate(
modified_since, usegmt=True
)
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
headers.update(_range.headers)
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or DEFAULT_HTTP_CONTENT_TYPE
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if isinstance(stream_large_files, bool):
threshold = 1024 * 1024
else:
threshold = stream_large_files
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except (IsADirectoryError, PermissionError):
return await directory_handler.handle(request, request.path)
except RangeNotSatisfiable:
raise
except FileNotFoundError:
raise not_found
except Exception:
error_logger.exception(
"Exception in static request handler: "
f"path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise
async def _get_file_path(self, file_or_directory, __file_uri__, not_found):
file_path_raw = Path(unquote(file_or_directory))
root_path = file_path = file_path_raw.resolve()
if __file_uri__:
# Strip all / that in the beginning of the URL to help prevent
# python from herping a derp and treating the uri as an
# absolute path
unquoted_file_uri = unquote(__file_uri__).lstrip("/")
file_path_raw = Path(file_or_directory, unquoted_file_uri)
file_path = file_path_raw.resolve()
if (
file_path < root_path and not file_path_raw.is_symlink()
) or ".." in file_path_raw.parts:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
try:
file_path.relative_to(root_path)
except ValueError:
if not file_path_raw.is_symlink():
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={__file_uri__}"
)
raise not_found
return file_path

View File

@@ -1,6 +1,7 @@
from pathlib import PurePath
from pathlib import Path
from typing import Dict, Iterable, List, NamedTuple, Optional, Union
from sanic.handlers.directory import DirectoryHandler
from sanic.models.handler_types import (
ErrorMiddlewareType,
ListenerType,
@@ -46,16 +47,17 @@ class FutureException(NamedTuple):
class FutureStatic(NamedTuple):
uri: str
file_or_directory: Union[str, bytes, PurePath]
file_or_directory: Path
pattern: str
use_modified_since: bool
use_content_range: bool
stream_large_files: bool
stream_large_files: Union[bool, int]
name: str
host: Optional[str]
strict_slashes: Optional[bool]
content_type: Optional[bool]
content_type: Optional[str]
resource_type: Optional[str]
directory_handler: DirectoryHandler
class FutureSignal(NamedTuple):

0
sanic/pages/__init__.py Normal file
View File

70
sanic/pages/base.py Normal file
View File

@@ -0,0 +1,70 @@
from abc import ABC, abstractmethod
from html5tagger import HTML, Builder, Document
from sanic import __version__ as VERSION
from sanic.application.logo import SVG_LOGO_SIMPLE
from sanic.pages.css import CSS
class BasePage(ABC, metaclass=CSS): # no cov
TITLE = "Sanic"
HEADING = None
CSS: str
doc: Builder
def __init__(self, debug: bool = True) -> None:
self.debug = debug
@property
def style(self) -> str:
return self.CSS
def render(self) -> str:
self.doc = Document(self.TITLE, lang="en", id="sanic")
self._head()
self._body()
self._foot()
return str(self.doc)
def _head(self) -> None:
self.doc.style(HTML(self.style))
with self.doc.header:
self.doc.div(self.HEADING or self.TITLE)
def _foot(self) -> None:
with self.doc.footer:
self.doc.div("powered by")
with self.doc.div:
self._sanic_logo()
if self.debug:
self.doc.div(f"Version {VERSION}")
with self.doc.div:
for idx, (title, href) in enumerate(
(
("Docs", "https://sanic.dev"),
("Help", "https://sanic.dev/en/help.html"),
("GitHub", "https://github.com/sanic-org/sanic"),
)
):
if idx > 0:
self.doc(" | ")
self.doc.a(
title,
href=href,
target="_blank",
referrerpolicy="no-referrer",
)
self.doc.div("DEBUG mode")
@abstractmethod
def _body(self) -> None:
...
def _sanic_logo(self) -> None:
self.doc.a(
HTML(SVG_LOGO_SIMPLE),
href="https://sanic.dev",
target="_blank",
referrerpolicy="no-referrer",
)

35
sanic/pages/css.py Normal file
View File

@@ -0,0 +1,35 @@
from abc import ABCMeta
from pathlib import Path
from typing import Optional
CURRENT_DIR = Path(__file__).parent
def _extract_style(maybe_style: Optional[str], name: str) -> str:
if maybe_style is not None:
maybe_path = Path(maybe_style)
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return maybe_style
maybe_path = CURRENT_DIR / "styles" / f"{name}.css"
if maybe_path.exists():
return maybe_path.read_text(encoding="UTF-8")
return ""
class CSS(ABCMeta):
"""Cascade stylesheets, i.e. combine all ancestor styles"""
def __new__(cls, name, bases, attrs):
Page = super().__new__(cls, name, bases, attrs)
# Use a locally defined STYLE or the one from styles directory
Page.STYLE = _extract_style(attrs.get("STYLE_FILE"), name)
Page.STYLE += attrs.get("STYLE_APPEND", "")
# Combine with all ancestor styles
Page.CSS = "".join(
Class.STYLE
for Class in reversed(Page.__mro__)
if type(Class) is CSS
)
return Page

View File

@@ -0,0 +1,66 @@
import sys
from typing import Dict, Iterable
from html5tagger import E
from .base import BasePage
if sys.version_info < (3, 8): # no cov
FileInfo = Dict
else:
from typing import TypedDict
class FileInfo(TypedDict):
icon: str
file_name: str
file_access: str
file_size: str
class DirectoryPage(BasePage): # no cov
TITLE = "Directory Viewer"
def __init__(
self, files: Iterable[FileInfo], url: str, debug: bool
) -> None:
super().__init__(debug)
self.files = files
self.url = url
def _body(self) -> None:
with self.doc.main:
self._headline()
files = list(self.files)
if files:
self._file_table(files)
else:
self.doc.p("The folder is empty.")
def _headline(self):
"""Implement a heading with the current path, combined with
breadcrumb links"""
with self.doc.h1(id="breadcrumbs"):
p = self.url.split("/")[:-1]
for i, part in enumerate(p):
path = "/".join(p[: i + 1]) + "/"
with self.doc.a(href=path):
self.doc.span(part, class_="dir").span("/", class_="sep")
def _file_table(self, files: Iterable[FileInfo]):
with self.doc.table(class_="autoindex container"):
for f in files:
self._file_row(**f)
def _file_row(
self,
icon: str,
file_name: str,
file_access: str,
file_size: str,
):
first = E.span(icon, class_="icon").a(file_name, href=file_name)
self.doc.tr.td(first).td(file_size).td(file_access)

109
sanic/pages/error.py Normal file
View File

@@ -0,0 +1,109 @@
from typing import Any, Mapping
import tracerite.html
from html5tagger import E
from tracerite import html_traceback, inspector
from sanic.request import Request
from .base import BasePage
# Avoid showing the request in the traceback variable inspectors
inspector.blacklist_types += (Request,)
ENDUSER_TEXT = """\
We're sorry, but it looks like something went wrong. Please try refreshing \
the page or navigating back to the homepage. If the issue persists, our \
technical team is working to resolve it as soon as possible. We apologize \
for the inconvenience and appreciate your patience.\
"""
class ErrorPage(BasePage):
STYLE_APPEND = tracerite.html.style
def __init__(
self,
debug: bool,
title: str,
text: str,
request: Request,
exc: Exception,
) -> None:
super().__init__(debug)
name = request.app.name.replace("_", " ").strip()
if name.islower():
name = name.title()
self.TITLE = f"Application {name} cannot handle your request"
self.HEADING = E("Application ").strong(name)(
" cannot handle your request"
)
self.title = title
self.text = text
self.request = request
self.exc = exc
self.details_open = not getattr(exc, "quiet", False)
def _head(self) -> None:
self.doc._script(tracerite.html.javascript)
super()._head()
def _body(self) -> None:
debug = self.request.app.debug
route_name = self.request.name or "[route not found]"
with self.doc.main:
self.doc.h1(f"⚠️ {self.title}").p(self.text)
# Show context details if available on the exception
context = getattr(self.exc, "context", None)
if context:
self._key_value_table(
"Issue context", "exception-context", context
)
if not debug:
with self.doc.div(id="enduser"):
self.doc.p(ENDUSER_TEXT).p.a("Front Page", href="/")
return
# Show additional details in debug mode,
# open by default for 500 errors
with self.doc.details(open=self.details_open, class_="smalltext"):
# Show extra details if available on the exception
extra = getattr(self.exc, "extra", None)
if extra:
self._key_value_table(
"Issue extra data", "exception-extra", extra
)
self.doc.summary(
"Details for developers (Sanic debug mode only)"
)
if self.exc:
with self.doc.div(class_="exception-wrapper"):
self.doc.h2(f"Exception in {route_name}:")
self.doc(
html_traceback(self.exc, include_js_css=False)
)
self._key_value_table(
f"{self.request.method} {self.request.path}",
"request-headers",
self.request.headers,
)
def _key_value_table(
self, title: str, table_id: str, data: Mapping[str, Any]
) -> None:
with self.doc.div(class_="key-value-display"):
self.doc.h2(title)
with self.doc.dl(id=table_id, class_="key-value-table smalltext"):
for key, value in data.items():
# Reading values may cause a new exception, so suppress it
try:
value = str(value)
except Exception:
value = E.em("Unable to display value")
self.doc.dt.span(key, class_="nobr key").span(": ").dd(
value
)

View File

@@ -0,0 +1,146 @@
/** BasePage **/
:root {
--sanic: #ff0d68;
--sanic-yellow: #FFE900;
--sanic-background: #efeced;
--sanic-text: #121010;
--sanic-text-lighter: #756169;
--sanic-link: #ff0d68;
--sanic-block-background: #f7f4f6;
--sanic-block-text: #000;
--sanic-block-alt-text: #6b6468;
--sanic-header-background: #272325;
--sanic-header-border: #fff;
--sanic-header-text: #fff;
--sanic-highlight-background: var(--sanic-yellow);
--sanic-highlight-text: var(--sanic-text);
--sanic-tab-background: #f7f4f6;
--sanic-tab-shadow: #f7f6f6;
--sanic-tab-text: #222021;
--sanic-tracerite-var: var(--sanic-text);
--sanic-tracerite-val: #ff0d68;
--sanic-tracerite-type: #6d6a6b;
}
@media (prefers-color-scheme: dark) {
:root {
--sanic-text: #f7f4f6;
--sanic-background: #121010;
--sanic-block-background: #0f0d0e;
--sanic-block-text: #f7f4f6;
--sanic-header-background: #030203;
--sanic-header-border: #000;
--sanic-highlight-text: var(--sanic-background);
--sanic-tab-background: #292728;
--sanic-tab-shadow: #0f0d0e;
--sanic-tab-text: #aea7ab;
}
}
html {
font: 16px sans-serif;
background: var(--sanic-background);
color: var(--sanic-text);
scrollbar-gutter: stable;
overflow: hidden auto;
}
body {
margin: 0;
font-size: 1.25rem;
line-height: 125%;
}
body>* {
padding: 1rem 2vw;
}
@media (max-width: 1000px) {
body>* {
padding: 0.5rem 1.5vw;
}
html {
/* Scale everything by rem of 6px-16px by viewport width */
font-size: calc(6px + 10 * 100vw / 1000);
}
}
main {
/* Make sure the footer is closer to bottom */
min-height: 70vh;
/* Generous padding for readability */
padding: 1rem 2.5rem;
}
.smalltext {
font-size: 1.0rem;
}
.container {
min-width: 600px;
max-width: 1600px;
}
header {
background: var(--sanic-header-background);
color: var(--sanic-header-text);
border-bottom: 1px solid var(--sanic-header-border);
text-align: center;
}
footer {
text-align: center;
display: flex;
flex-direction: column;
font-size: 0.8rem;
margin: 2rem;
line-height: 1.5em;
}
h1 {
text-align: left;
}
a {
text-decoration: none;
color: var(--sanic-link);
}
a:hover,
a:focus {
text-decoration: underline;
outline: none;
}
span.icon {
margin-right: 1rem;
}
#logo-simple {
height: 1.75rem;
padding: 0 0.25rem;
}
@media (prefers-color-scheme: dark) {
#logo-simple path:last-child {
fill: #e1e1e1;
}
}
#sanic pre,
#sanic code {
font-family: "Fira Code",
"Source Code Pro",
Menlo,
Meslo,
Monaco,
Consolas,
Lucida Console,
monospace;
font-size: 0.8rem;
}

View File

@@ -0,0 +1,63 @@
/** DirectoryPage **/
#breadcrumbs>a:hover {
text-decoration: none;
}
#breadcrumbs>a .dir {
padding: 0 0.25em;
}
#breadcrumbs>a:first-child:hover::before,
#breadcrumbs>a .dir:hover {
text-decoration: underline;
}
#breadcrumbs>a:first-child::before {
content: "🏠";
}
#breadcrumbs>a:last-child {
color: #ff0d68;
}
main a {
color: inherit;
font-weight: bold;
}
table.autoindex {
width: 100%;
font-family: monospace;
font-size: 1.25rem;
}
table.autoindex tr {
display: flex;
}
table.autoindex tr:hover {
background-color: #ddd;
}
table.autoindex td {
margin: 0 0.5rem;
}
table.autoindex td:first-child {
flex: 1;
}
table.autoindex td:nth-child(2) {
text-align: right;
}
table.autoindex td:last-child {
text-align: right;
}
@media (prefers-color-scheme: dark) {
table.autoindex tr:hover {
background-color: #222;
}
}

View File

@@ -0,0 +1,108 @@
/** ErrorPage **/
#enduser {
max-width: 30em;
margin: 5em auto 5em auto;
text-align: justify;
/*text-justify: both;*/
}
#enduser a {
color: var(--sanic-blue);
}
#enduser p:last-child {
text-align: right;
}
summary {
margin-top: 3em;
color: var(--sanic-text-lighter);
cursor: pointer;
}
.tracerite {
--tracerite-var: var(--sanic-tracerite-var);
--tracerite-val: var(--sanic-tracerite-val);
--tracerite-type: var(--sanic-tracerite-type);
--tracerite-exception: var(--sanic);
--tracerite-highlight: var(--sanic-yellow);
--tracerite-tab: var(--sanic-tab-background);
--tracerite-tab-text: var(--sanic-tab-text);
}
.tracerite>h3 {
margin: 0.5rem 0 !important;
}
#sanic .tracerite .traceback-labels button {
font-size: 0.8rem;
line-height: 120%;
background: var(--tracerite-tab);
color: var(--tracerite-tab-text);
transition: 0.3s;
cursor: pointer;
}
.tracerite .traceback-labels {
padding-top: 5px;
}
.tracerite .traceback-labels button:hover {
filter: contrast(150%) brightness(120%) drop-shadow(0 -0 2px var(--sanic-tab-shadow));
}
#sanic .tracerite .tracerite-tooltip::before {
bottom: 1.75em;
}
#sanic .tracerite .traceback-details mark span {
background: var(--sanic-highlight-background);
color: var(--sanic-highlight-text);
}
header {
background: var(--sanic-header-background);
}
h2 {
font-size: 1.3rem;
color: var(--sanic-text);
}
.key-value-display,
.exception-wrapper {
padding: 0.5rem;
margin-top: 1rem;
}
.key-value-display {
background-color: var(--sanic-block-background);
color: var(--sanic-block-text);
}
.key-value-display h2 {
margin-bottom: 0.2em;
}
dl.key-value-table {
width: 100%;
margin: 0;
display: grid;
grid-template-columns: 1fr 5fr;
grid-gap: .3em;
white-space: pre-wrap;
}
dl.key-value-table * {
margin: 0;
}
dl.key-value-table dt {
color: var(--sanic-block-alt-text);
word-break: break-word;
}
dl.key-value-table dd {
/* Better breaking for cookies header and such */
word-break: break-all;
}

File diff suppressed because it is too large Load Diff

View File

@@ -232,7 +232,7 @@ class JSONResponse(HTTPResponse):
body: Optional[Any] = None,
status: int = 200,
headers: Optional[Union[Header, Dict[str, str]]] = None,
content_type: str = "application/json",
content_type: Optional[str] = None,
dumps: Optional[Callable[..., str]] = None,
**kwargs: Any,
):

View File

@@ -39,13 +39,13 @@ class Router(BaseRouter):
extra={"host": host} if host else None,
)
except RoutingNotFound as e:
raise NotFound("Requested URL {} not found".format(e.path))
raise NotFound(f"Requested URL {e.path} not found") from None
except NoMethod as e:
raise MethodNotAllowed(
"Method {} not allowed for URL {}".format(method, path),
f"Method {method} not allowed for URL {path}",
method=method,
allowed_methods=e.allowed_methods,
)
) from None
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def get( # type: ignore
@@ -61,6 +61,7 @@ class Router(BaseRouter):
correct response
:rtype: Tuple[ Route, RouteHandler, Dict[str, Any]]
"""
__tracebackhide__ = True
return self._get(path, method, host)
def add( # type: ignore

View File

@@ -45,7 +45,7 @@ class WebSocketConnection:
await self._send(message)
async def recv(self, *args, **kwargs) -> Optional[str]:
async def recv(self, *args, **kwargs) -> Optional[Union[str, bytes]]:
message = await self._receive()
if message["type"] == "websocket.receive":
@@ -53,7 +53,7 @@ class WebSocketConnection:
return message["text"]
except KeyError:
try:
return message["bytes"].decode()
return message["bytes"]
except KeyError:
raise InvalidUsage("Bad ASGI message received")
elif message["type"] == "websocket.disconnect":

View File

@@ -2,7 +2,6 @@ from pathlib import Path
from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.response import redirect
def create_simple_server(directory: Path):
@@ -12,10 +11,8 @@ def create_simple_server(directory: Path):
)
app = Sanic("SimpleServer")
app.static("/", directory, name="main")
@app.get("/")
def index(_):
return redirect(app.url_for("main", filename="index.html"))
app.static(
"/", directory, name="main", directory_view=True, index="index.html"
)
return app

View File

@@ -1,2 +1,4 @@
[flake8]
ignore = E203, W503
per-file-ignores =
sanic/app.py:E402

View File

@@ -35,6 +35,7 @@ def open_local(paths, mode="r", encoding="utf8"):
return codecs.open(path, mode, encoding)
def str_to_bool(val: str) -> bool:
val = val.lower()
if val in {
@@ -55,6 +56,7 @@ def str_to_bool(val: str) -> bool:
else:
raise ValueError(f"Invalid truth value {val}")
with open_local(["sanic", "__version__.py"], encoding="latin1") as fp:
try:
version = re.findall(
@@ -79,7 +81,7 @@ setup_kwargs = {
),
"long_description": long_description,
"packages": find_packages(exclude=("tests", "tests.*")),
"package_data": {"sanic": ["py.typed"]},
"package_data": {"sanic": ["py.typed", "pages/styles/*"]},
"platforms": "any",
"python_requires": ">=3.7",
"classifiers": [
@@ -109,6 +111,8 @@ requirements = [
"aiofiles>=0.6.0",
"websockets>=10.0",
"multidict>=5.0,<7.0",
"html5tagger>=1.2.1",
"tracerite>=1.0.0",
]
tests_require = [

View File

@@ -1,5 +1,7 @@
import asyncio
import inspect
import logging
import os
import random
import re
import string
@@ -58,7 +60,6 @@ CACHE: Dict[str, Any] = {}
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = HTTP_METHODS
ROUTE_PARAM_TYPES = ["str", "int", "float", "alpha", "uuid"]
@@ -232,3 +233,12 @@ def urlopen():
urlopen.read = Mock()
with patch("sanic.cli.inspector_client.urlopen", urlopen):
yield urlopen
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory

View File

@@ -36,6 +36,7 @@ def test_app_loop_running(app: Sanic):
assert response.text == "pass"
@pytest.mark.asyncio
def test_create_asyncio_server(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
@@ -44,6 +45,7 @@ def test_create_asyncio_server(app: Sanic):
assert srv.is_serving() is True
@pytest.mark.asyncio
def test_asyncio_server_no_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@@ -55,6 +57,7 @@ def test_asyncio_server_no_start_serving(app: Sanic):
assert srv.is_serving() is False
@pytest.mark.asyncio
def test_asyncio_server_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@@ -72,6 +75,7 @@ def test_asyncio_server_start_serving(app: Sanic):
# Looks like we can't easily test `serve_forever()`
@pytest.mark.asyncio
def test_create_server_main(app: Sanic, caplog):
app.listener("main_process_start")(lambda *_: ...)
loop = asyncio.get_event_loop()
@@ -86,6 +90,7 @@ def test_create_server_main(app: Sanic, caplog):
) in caplog.record_tuples
@pytest.mark.asyncio
def test_create_server_no_startup(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
@@ -101,6 +106,7 @@ def test_create_server_no_startup(app: Sanic):
loop.run_until_complete(srv.start_serving())
@pytest.mark.asyncio
def test_create_server_main_convenience(app: Sanic, caplog):
app.main_process_start(lambda *_: ...)
loop = asyncio.get_event_loop()
@@ -126,7 +132,6 @@ def test_app_loop_not_running(app: Sanic):
def test_app_run_raise_type_error(app: Sanic):
with pytest.raises(TypeError) as excinfo:
app.run(loop="loop")
@@ -139,7 +144,6 @@ def test_app_run_raise_type_error(app: Sanic):
def test_app_route_raise_value_error(app: Sanic):
with pytest.raises(ValueError) as excinfo:
@app.route("/test")
@@ -221,7 +225,6 @@ def test_app_websocket_parameters(websocket_protocol_mock, app: Sanic):
def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@@ -241,7 +244,6 @@ def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
def test_handle_request_with_nested_exception_debug(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
def mock_error_handler_response(*args, **kwargs):
@@ -470,6 +472,7 @@ def test_uvloop_config(app: Sanic, monkeypatch, use):
try_use_uvloop.assert_not_called()
@pytest.mark.asyncio
def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
apps = (Sanic("default-uvloop"), Sanic("no-uvloop"), Sanic("yes-uvloop"))
@@ -506,6 +509,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
assert counter[(logging.WARNING, message)] == modified
@pytest.mark.asyncio
def test_multiple_uvloop_configs_display_warning(caplog):
Sanic._uvloop_setting = None # Reset the setting (changed in prev tests)

View File

@@ -342,7 +342,7 @@ async def test_websocket_send(send, receive, message_stack):
@pytest.mark.asyncio
async def test_websocket_receive(send, receive, message_stack):
async def test_websocket_text_receive(send, receive, message_stack):
msg = {"text": "hello", "type": "websocket.receive"}
message_stack.append(msg)
@@ -352,6 +352,17 @@ async def test_websocket_receive(send, receive, message_stack):
assert text == msg["text"]
@pytest.mark.asyncio
async def test_websocket_bytes_receive(send, receive, message_stack):
msg = {"bytes": b"hello", "type": "websocket.receive"}
message_stack.append(msg)
ws = WebSocketConnection(send, receive)
data = await ws.receive()
assert data == msg["bytes"]
@pytest.mark.asyncio
async def test_websocket_accept_with_no_subprotocols(
send, receive, message_stack

View File

@@ -148,7 +148,6 @@ def test_cookie_set_unknown_property():
def test_cookie_set_same_key(app):
cookies = {"test": "wait"}
@app.get("/")

View File

@@ -1,12 +1,14 @@
import logging
import pytest
from sanic import Sanic
from sanic.config import Config
from sanic.errorpages import HTMLRenderer, exception_response
from sanic.errorpages import TextRenderer, exception_response, guess_mime
from sanic.exceptions import NotFound, SanicException
from sanic.handlers import ErrorHandler
from sanic.request import Request
from sanic.response import HTTPResponse, html, json, text
from sanic.response import HTTPResponse, empty, html, json, text
@pytest.fixture
@@ -17,6 +19,44 @@ def app():
def err(request):
raise Exception("something went wrong")
@app.get("/forced_json/<fail>", error_format="json")
def manual_fail(request, fail):
if fail == "fail":
raise Exception
return html("") # Should be ignored
@app.get("/empty/<fail>")
def empty_fail(request, fail):
if fail == "fail":
raise Exception
return empty()
@app.get("/json/<fail>")
def json_fail(request, fail):
if fail == "fail":
raise Exception
# After 23.3 route format should become json, older versions think it
# is mixed due to empty mapping to html, and don't find any format.
return json({"foo": "bar"}) if fail == "json" else empty()
@app.get("/html/<fail>")
def html_fail(request, fail):
if fail == "fail":
raise Exception
return html("<h1>foo</h1>")
@app.get("/text/<fail>")
def text_fail(request, fail):
if fail == "fail":
raise Exception
return text("foo")
@app.get("/mixed/<param>")
def mixed_fail(request, param):
if param not in ("json", "html"):
raise Exception
return json({}) if param == "json" else html("")
return app
@@ -28,14 +68,14 @@ def fake_request(app):
@pytest.mark.parametrize(
"fallback,content_type, exception, status",
(
(None, "text/html; charset=utf-8", Exception, 500),
(None, "text/plain; charset=utf-8", Exception, 500),
("html", "text/html; charset=utf-8", Exception, 500),
("auto", "text/html; charset=utf-8", Exception, 500),
("auto", "text/plain; charset=utf-8", Exception, 500),
("text", "text/plain; charset=utf-8", Exception, 500),
("json", "application/json", Exception, 500),
(None, "text/html; charset=utf-8", NotFound, 404),
(None, "text/plain; charset=utf-8", NotFound, 404),
("html", "text/html; charset=utf-8", NotFound, 404),
("auto", "text/html; charset=utf-8", NotFound, 404),
("auto", "text/plain; charset=utf-8", NotFound, 404),
("text", "text/plain; charset=utf-8", NotFound, 404),
("json", "application/json", NotFound, 404),
),
@@ -43,6 +83,10 @@ def fake_request(app):
def test_should_return_html_valid_setting(
fake_request, fallback, content_type, exception, status
):
# Note: if fallback is None or "auto", prior to PR #2668 base was returned
# and after that a text response is given because it matches */*. Changed
# base to TextRenderer in this test, like it is in Sanic itself, so the
# test passes with either version but still covers everything that it did.
if fallback:
fake_request.app.config.FALLBACK_ERROR_FORMAT = fallback
@@ -53,7 +97,7 @@ def test_should_return_html_valid_setting(
fake_request,
e,
True,
base=HTMLRenderer,
base=TextRenderer,
fallback=fake_request.app.config.FALLBACK_ERROR_FORMAT,
)
@@ -259,15 +303,16 @@ def test_fallback_with_content_type_mismatch_accept(app):
"accept,content_type,expected",
(
(None, None, "text/plain; charset=utf-8"),
("foo/bar", None, "text/html; charset=utf-8"),
("foo/bar", None, "text/plain; charset=utf-8"),
("application/json", None, "application/json"),
("application/json,text/plain", None, "application/json"),
("text/plain,application/json", None, "application/json"),
("text/plain,foo/bar", None, "text/plain; charset=utf-8"),
# Following test is valid after v22.3
# ("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/html; charset=utf-8"),
("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/plain; charset=utf-8"),
("*/*", "application/json", "application/json"),
# App wants text/plain but accept has equal entries for it
("text/*,*/plain", None, "text/plain; charset=utf-8"),
),
)
def test_combinations_for_auto(fake_request, accept, content_type, expected):
@@ -286,7 +331,7 @@ def test_combinations_for_auto(fake_request, accept, content_type, expected):
fake_request,
e,
True,
base=HTMLRenderer,
base=TextRenderer,
fallback="auto",
)
@@ -376,3 +421,109 @@ def test_config_fallback_bad_value(app):
message = "Unknown format: fake"
with pytest.raises(SanicException, match=message):
app.config.FALLBACK_ERROR_FORMAT = "fake"
@pytest.mark.parametrize(
"route_format,fallback,accept,expected",
(
(
"json",
"html",
"*/*",
"The client accepts */*, using 'json' from fakeroute",
),
(
"json",
"auto",
"text/html,*/*;q=0.8",
"The client accepts text/html, using 'html' from any",
),
(
"json",
"json",
"text/html,*/*;q=0.8",
"The client accepts */*;q=0.8, using 'json' from fakeroute",
),
(
"",
"html",
"text/*,*/plain",
"The client accepts text/*, using 'html' from FALLBACK_ERROR_FORMAT",
),
(
"",
"json",
"text/*,*/*",
"The client accepts */*, using 'json' from FALLBACK_ERROR_FORMAT",
),
(
"",
"auto",
"*/*,application/json;q=0.5",
"The client accepts */*, using 'json' from request.accept",
),
(
"",
"auto",
"*/*",
"The client accepts */*, using 'json' from content-type",
),
(
"",
"auto",
"text/html,text/plain",
"The client accepts text/plain, using 'text' from any",
),
(
"",
"auto",
"text/html,text/plain;q=0.9",
"The client accepts text/html, using 'html' from any",
),
(
"html",
"json",
"application/xml",
"No format found, the client accepts [application/xml]",
),
("", "auto", "*/*", "The client accepts */*, using 'text' from any"),
("", "", "*/*", "No format found, the client accepts [*/*]"),
# DEPRECATED: remove in 24.3
(
"",
"auto",
"*/*",
"The client accepts */*, using 'json' from request.json",
),
),
)
def test_guess_mime_logging(
caplog, fake_request, route_format, fallback, accept, expected
):
class FakeObject:
pass
fake_request.route = FakeObject()
fake_request.route.name = "fakeroute"
fake_request.route.extra = FakeObject()
fake_request.route.extra.error_format = route_format
if accept is None:
del fake_request.headers["accept"]
else:
fake_request.headers["accept"] = accept
if "content-type" in expected:
fake_request.headers["content-type"] = "application/json"
# Fake JSON content (DEPRECATED: remove in 24.3)
if "request.json" in expected:
fake_request.parsed_json = {"foo": "bar"}
with caplog.at_level(logging.DEBUG, logger="sanic.root"):
guess_mime(fake_request, fallback)
(logmsg,) = [
r.message for r in caplog.records if r.funcName == "guess_mime"
]
assert logmsg == expected

View File

@@ -23,11 +23,11 @@ from sanic.exceptions import (
from sanic.response import text
def dl_to_dict(soup, css_class):
def dl_to_dict(soup, dl_id):
keys, values = [], []
for dl in soup.find_all("dl", {"class": css_class}):
for dl in soup.find_all("dl", {"id": dl_id}):
for dt in dl.find_all("dt"):
keys.append(dt.text.strip())
keys.append(dt.text.split(":", 1)[0])
for dd in dl.find_all("dd"):
values.append(dd.text.strip())
return dict(zip(keys, values))
@@ -194,10 +194,7 @@ def test_handled_unhandled_exception(exception_app):
assert "Internal Server Error" in soup.h1.text
message = " ".join(soup.p.text.split())
assert message == (
"The server encountered an internal error and "
"cannot complete your request."
)
assert "The application encountered an unexpected error" in message
def test_exception_in_exception_handler(exception_app):
@@ -299,7 +296,7 @@ def test_contextual_exception_context(debug):
_, response = app.test_client.post("/coffee/html", debug=debug)
soup = BeautifulSoup(response.body, "html.parser")
dl = dl_to_dict(soup, "context")
dl = dl_to_dict(soup, "exception-context")
assert response.status == 418
assert "Sorry, I cannot brew coffee" in soup.find("p").text
assert dl == {"foo": "bar"}
@@ -340,7 +337,7 @@ def test_contextual_exception_extra(debug):
_, response = app.test_client.post("/coffee/html", debug=debug)
soup = BeautifulSoup(response.body, "html.parser")
dl = dl_to_dict(soup, "extra")
dl = dl_to_dict(soup, "exception-extra")
assert response.status == 418
assert "Found bar" in soup.find("p").text
if debug:

View File

@@ -62,7 +62,6 @@ def exception_handler_app():
@exception_handler_app.route("/8", error_format="html")
def handler_8(request):
raise ErrorWithRequestCtx("OK")
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
@@ -124,10 +123,10 @@ def test_html_traceback_output_in_debug_mode(exception_handler_app: Sanic):
assert "handler_4" in html
assert "foo = bar" in html
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
"NameError: name 'bar' is not defined while handling path /4"
) == summary_text
summary_text = soup.select("h3")[0].text
assert "NameError: name 'bar' is not defined" == summary_text
request_text = soup.select("h2")[-1].text
assert "GET /4" == request_text
def test_inherited_exception_handler(exception_handler_app: Sanic):
@@ -147,11 +146,10 @@ def test_chained_exception_handler(exception_handler_app: Sanic):
assert "handler_6" in html
assert "foo = 1 / arg" in html
assert "ValueError" in html
assert "GET /6" in html
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
"ZeroDivisionError: division by zero while handling path /6/0"
) == summary_text
summary_text = soup.select("h3")[0].text
assert "ZeroDivisionError: division by zero" == summary_text
def test_exception_handler_lookup(exception_handler_app: Sanic):
@@ -214,7 +212,7 @@ def test_error_handler_noisy_log(
exception_handler_app: Sanic, monkeypatch: MonkeyPatch
):
err_logger = Mock()
monkeypatch.setattr(handlers, "error_logger", err_logger)
monkeypatch.setattr(handlers.error, "error_logger", err_logger)
exception_handler_app.config["NOISY_EXCEPTIONS"] = False
exception_handler_app.test_client.get("/1")

View File

@@ -2,12 +2,16 @@ from unittest.mock import Mock
import pytest
from sanic import headers, text
from sanic import Sanic, headers, json, text
from sanic.exceptions import InvalidHeader, PayloadTooLarge
from sanic.http import Http
from sanic.request import Request
def make_request(headers) -> Request:
return Request(b"/", headers, "1.1", "GET", None, None)
@pytest.fixture
def raised_ceiling():
Http.HEADER_CEILING = 32_768
@@ -45,29 +49,17 @@ def raised_ceiling():
("attachment", {"filename": "strange;name", "size": "123"}),
),
(
'form-data; name="files"; filename="fo\\"o;bar\\"',
("form-data", {"name": "files", "filename": 'fo"o;bar\\'})
# cgi.parse_header:
# ('form-data', {'name': 'files', 'filename': 'fo"o;bar\\'})
# werkzeug.parse_options_header:
# ('form-data', {'name': 'files', 'filename': '"fo\\"o', 'bar\\"': None})
'form-data; name="foo"; value="%22\\%0D%0A"',
("form-data", {"name": "foo", "value": '"\\\n'}),
),
# <input type=file name="foo&quot;;bar\"> with Unicode filename!
(
# Chrome:
# Chrome, Firefox:
# Content-Disposition: form-data; name="foo%22;bar\"; filename="😀"
'form-data; name="foo%22;bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
# cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'})
# werkzeug: ('form-data', {'name': 'foo%22;bar"; filename='})
),
(
# Firefox:
# Content-Disposition: form-data; name="foo\";bar\"; filename="😀"
'form-data; name="foo\\";bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
# cgi: ('form-data', {'name': 'foo";bar"; filename="😀'})
# werkzeug: ('form-data', {'name': 'foo";bar"; filename='})
# werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='})
),
],
)
@@ -187,27 +179,24 @@ def test_request_line(app):
@pytest.mark.parametrize(
"raw",
"raw,expected_subtype",
(
"show/first, show/second",
"show/*, show/first",
"*/*, show/first",
"*/*, show/*",
"other/*; q=0.1, show/*; q=0.2",
"show/first; q=0.5, show/second; q=0.5",
"show/first; foo=bar, show/second; foo=bar",
"show/second, show/first; foo=bar",
"show/second; q=0.5, show/first; foo=bar; q=0.5",
"show/second; q=0.5, show/first; q=1.0",
"show/first, show/second; q=1.0",
("show/first, show/second", "first"),
("show/*, show/first", "first"),
("*/*, show/first", "first"),
("*/*, show/*", "*"),
("other/*; q=0.1, show/*; q=0.2", "*"),
("show/first; q=0.5, show/second; q=0.5", "first"),
("show/first; foo=bar, show/second; foo=bar", "first"),
("show/second, show/first; foo=bar", "first"),
("show/second; q=0.5, show/first; foo=bar; q=0.5", "first"),
("show/second; q=0.5, show/first; q=1.0", "first"),
("show/first, show/second; q=1.0", "second"),
),
)
def test_parse_accept_ordered_okay(raw):
def test_parse_accept_ordered_okay(raw, expected_subtype):
ordered = headers.parse_accept(raw)
expected_subtype = (
"*" if all(q.subtype.is_wildcard for q in ordered) else "first"
)
assert ordered[0].type_ == "show"
assert ordered[0].type == "show"
assert ordered[0].subtype == expected_subtype
@@ -217,6 +206,7 @@ def test_parse_accept_ordered_okay(raw):
"missing",
"missing/",
"/missing",
"/",
),
)
def test_bad_accept(raw):
@@ -225,128 +215,83 @@ def test_bad_accept(raw):
def test_empty_accept():
assert headers.parse_accept("") == []
a = headers.parse_accept("")
assert a == []
assert not a.match("*/*")
def test_wildcard_accept_set_ok():
accept = headers.parse_accept("*/*")[0]
assert accept.type_.is_wildcard
assert accept.subtype.is_wildcard
assert accept.type == "*"
assert accept.subtype == "*"
assert accept.has_wildcard
accept = headers.parse_accept("foo/*")[0]
assert accept.type == "foo"
assert accept.subtype == "*"
assert accept.has_wildcard
accept = headers.parse_accept("foo/bar")[0]
assert not accept.type_.is_wildcard
assert not accept.subtype.is_wildcard
assert accept.type == "foo"
assert accept.subtype == "bar"
assert not accept.has_wildcard
def test_accept_parsed_against_str():
accept = headers.Accept.parse("foo/bar")
assert accept > "foo/bar; q=0.1"
def test_media_type_equality():
assert headers.MediaType("foo") == headers.MediaType("foo") == "foo"
assert headers.MediaType("foo") == headers.MediaType("*") == "*"
assert headers.MediaType("foo") != headers.MediaType("bar")
assert headers.MediaType("foo") != "bar"
accept = headers.Matched.parse("foo/bar")
assert accept == "foo/bar; q=0.1"
def test_media_type_matching():
assert headers.MediaType("foo").match(headers.MediaType("foo"))
assert headers.MediaType("foo").match("foo")
assert not headers.MediaType("foo").match(headers.MediaType("*"))
assert not headers.MediaType("foo").match("*")
assert not headers.MediaType("foo").match(headers.MediaType("bar"))
assert not headers.MediaType("foo").match("bar")
assert headers.MediaType("foo", "bar").match(
headers.MediaType("foo", "bar")
)
assert headers.MediaType("foo", "bar").match("foo/bar")
@pytest.mark.parametrize(
"value,other,outcome,allow_type,allow_subtype",
"value,other,outcome",
(
# ALLOW BOTH
("foo/bar", "foo/bar", True, True, True),
("foo/bar", headers.Accept.parse("foo/bar"), True, True, True),
("foo/bar", "foo/*", True, True, True),
("foo/bar", headers.Accept.parse("foo/*"), True, True, True),
("foo/bar", "*/*", True, True, True),
("foo/bar", headers.Accept.parse("*/*"), True, True, True),
("foo/*", "foo/bar", True, True, True),
("foo/*", headers.Accept.parse("foo/bar"), True, True, True),
("foo/*", "foo/*", True, True, True),
("foo/*", headers.Accept.parse("foo/*"), True, True, True),
("foo/*", "*/*", True, True, True),
("foo/*", headers.Accept.parse("*/*"), True, True, True),
("*/*", "foo/bar", True, True, True),
("*/*", headers.Accept.parse("foo/bar"), True, True, True),
("*/*", "foo/*", True, True, True),
("*/*", headers.Accept.parse("foo/*"), True, True, True),
("*/*", "*/*", True, True, True),
("*/*", headers.Accept.parse("*/*"), True, True, True),
# ALLOW TYPE
("foo/bar", "foo/bar", True, True, False),
("foo/bar", headers.Accept.parse("foo/bar"), True, True, False),
("foo/bar", "foo/*", False, True, False),
("foo/bar", headers.Accept.parse("foo/*"), False, True, False),
("foo/bar", "*/*", False, True, False),
("foo/bar", headers.Accept.parse("*/*"), False, True, False),
("foo/*", "foo/bar", False, True, False),
("foo/*", headers.Accept.parse("foo/bar"), False, True, False),
("foo/*", "foo/*", False, True, False),
("foo/*", headers.Accept.parse("foo/*"), False, True, False),
("foo/*", "*/*", False, True, False),
("foo/*", headers.Accept.parse("*/*"), False, True, False),
("*/*", "foo/bar", False, True, False),
("*/*", headers.Accept.parse("foo/bar"), False, True, False),
("*/*", "foo/*", False, True, False),
("*/*", headers.Accept.parse("foo/*"), False, True, False),
("*/*", "*/*", False, True, False),
("*/*", headers.Accept.parse("*/*"), False, True, False),
# ALLOW SUBTYPE
("foo/bar", "foo/bar", True, False, True),
("foo/bar", headers.Accept.parse("foo/bar"), True, False, True),
("foo/bar", "foo/*", True, False, True),
("foo/bar", headers.Accept.parse("foo/*"), True, False, True),
("foo/bar", "*/*", False, False, True),
("foo/bar", headers.Accept.parse("*/*"), False, False, True),
("foo/*", "foo/bar", True, False, True),
("foo/*", headers.Accept.parse("foo/bar"), True, False, True),
("foo/*", "foo/*", True, False, True),
("foo/*", headers.Accept.parse("foo/*"), True, False, True),
("foo/*", "*/*", False, False, True),
("foo/*", headers.Accept.parse("*/*"), False, False, True),
("*/*", "foo/bar", False, False, True),
("*/*", headers.Accept.parse("foo/bar"), False, False, True),
("*/*", "foo/*", False, False, True),
("*/*", headers.Accept.parse("foo/*"), False, False, True),
("*/*", "*/*", False, False, True),
("*/*", headers.Accept.parse("*/*"), False, False, True),
("foo/bar", "foo/bar", True),
("foo/bar", headers.Matched.parse("foo/bar"), True),
("foo/bar", "foo/*", True),
("foo/bar", headers.Matched.parse("foo/*"), True),
("foo/bar", "*/*", True),
("foo/bar", headers.Matched.parse("*/*"), True),
("foo/*", "foo/bar", True),
("foo/*", headers.Matched.parse("foo/bar"), True),
("foo/*", "foo/*", True),
("foo/*", headers.Matched.parse("foo/*"), True),
("foo/*", "*/*", True),
("foo/*", headers.Matched.parse("*/*"), True),
("*/*", "foo/bar", True),
("*/*", headers.Matched.parse("foo/bar"), True),
("*/*", "foo/*", True),
("*/*", headers.Matched.parse("foo/*"), True),
("*/*", "*/*", True),
("*/*", headers.Matched.parse("*/*"), True),
),
)
def test_accept_matching(value, other, outcome, allow_type, allow_subtype):
assert (
headers.Accept.parse(value).match(
other,
allow_type_wildcard=allow_type,
allow_subtype_wildcard=allow_subtype,
)
is outcome
)
def test_accept_matching(value, other, outcome):
assert bool(headers.Matched.parse(value).match(other)) is outcome
@pytest.mark.parametrize("value", ("foo/bar", "foo/*", "*/*"))
def test_value_in_accept(value):
acceptable = headers.parse_accept(value)
assert "foo/bar" in acceptable
assert "foo/*" in acceptable
assert "*/*" in acceptable
assert acceptable.match("foo/bar")
assert acceptable.match("foo/*")
assert acceptable.match("*/*")
@pytest.mark.parametrize("value", ("foo/bar", "foo/*"))
def test_value_not_in_accept(value):
acceptable = headers.parse_accept(value)
assert "no/match" not in acceptable
assert "no/*" not in acceptable
assert not acceptable.match("no/match")
assert not acceptable.match("no/*")
assert "*/*" not in acceptable
assert "*/bar" not in acceptable
@pytest.mark.parametrize(
@@ -365,6 +310,160 @@ def test_value_not_in_accept(value):
),
),
)
def test_browser_headers(header, expected):
def test_browser_headers_general(header, expected):
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert request.accept == expected
assert [str(item) for item in request.accept] == expected
@pytest.mark.parametrize(
"header,expected",
(
(
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", # noqa: E501
[
("text/html", 1.0),
("application/xhtml+xml", 1.0),
("image/avif", 1.0),
("image/webp", 1.0),
("application/xml", 0.9),
("*/*", 0.8),
],
),
),
)
def test_browser_headers_specific(header, expected):
mimes = [e[0] for e in expected]
qs = [e[1] for e in expected]
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert request.accept == mimes
for a, m, q in zip(request.accept, mimes, qs):
assert a == m
assert a.mime == m
assert a.q == q
@pytest.mark.parametrize(
"raw",
(
"text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8",
"application/xml;q=0.9, */*;q=0.8, text/html, application/xhtml+xml",
(
"foo/bar;q=0.9, */*;q=0.8, text/html=0.8, "
"text/plain, application/xhtml+xml"
),
),
)
def test_accept_ordering(raw):
"""Should sort by q but also be stable."""
accept = headers.parse_accept(raw)
assert accept[0].type == "text"
raw1 = ", ".join(str(a) for a in accept)
accept = headers.parse_accept(raw1)
raw2 = ", ".join(str(a) for a in accept)
assert raw1 == raw2
def test_not_accept_wildcard():
accept = headers.parse_accept("*/*, foo/*, */bar, foo/bar;q=0.1")
assert not accept.match(
"text/html", "foo/foo", "bar/bar", accept_wildcards=False
)
# Should ignore wildcards in accept but still matches them from mimes
m = accept.match("text/plain", "*/*", accept_wildcards=False)
assert m.mime == "*/*"
assert m.match("*/*")
assert m.header == "foo/bar"
assert not accept.match(
"text/html", "foo/foo", "bar/bar", accept_wildcards=False
)
def test_accept_misc():
header = (
"foo/bar;q=0.0, */plain;param=123, text/plain, text/*, foo/bar;q=0.5"
)
a = headers.parse_accept(header)
assert repr(a) == (
"[*/plain;param=123, text/plain, text/*, "
"foo/bar;q=0.5, foo/bar;q=0.0]"
) # noqa: E501
assert str(a) == (
"*/plain;param=123, text/plain, text/*, "
"foo/bar;q=0.5, foo/bar;q=0.0"
) # noqa: E501
# q=1 types don't match foo/bar but match the two others,
# text/* comes first and matches */plain because it
# comes first in the header
m = a.match("foo/bar", "text/*", "text/plain")
assert repr(m) == "<text/* matched */plain;param=123>"
assert m == "text/*"
assert m.mime == "text/*"
assert m.header.mime == "*/plain"
assert m.header.type == "*"
assert m.header.subtype == "plain"
assert m.header.q == 1.0
assert m.header.params == dict(param="123")
# Matches object against another Matched object (by mime and header)
assert m == a.match("text/*")
# Against unsupported type falls back to object id matching
assert m != 123
# Matches the highest q value
m = a.match("foo/bar")
assert repr(m) == "<foo/bar matched foo/bar;q=0.5>"
assert m == "foo/bar"
assert m == "foo/bar;q=0.5"
# Matching nothing special case
m = a.match()
assert m == ""
assert m.header is None
# No header means anything
a = headers.parse_accept(None)
assert a == ["*/*"]
assert a.match("foo/bar")
# Empty header means nothing
a = headers.parse_accept("")
assert a == []
assert not a.match("foo/bar")
@pytest.mark.parametrize(
"headers,expected",
(
({"foo": "bar"}, "bar"),
((("foo", "bar"), ("foo", "baz")), "bar,baz"),
({}, ""),
),
)
def test_field_simple_accessor(headers, expected):
request = make_request(headers)
assert request.headers.foo == request.headers.foo_ == expected
@pytest.mark.parametrize(
"headers,expected",
(
({"foo-bar": "bar"}, "bar"),
((("foo-bar", "bar"), ("foo-bar", "baz")), "bar,baz"),
),
)
def test_field_hyphenated_accessor(headers, expected):
request = make_request(headers)
assert request.headers.foo_bar == request.headers.foo_bar_ == expected
def test_bad_accessor():
request = make_request({})
msg = "'Header' object has no attribute '_foo'"
with pytest.raises(AttributeError, match=msg):
request.headers._foo
def test_multiple_fields_accessor(app: Sanic):
@app.get("")
async def handler(request: Request):
return json({"field": request.headers.example_field})
_, response = app.test_client.get(
"/", headers=(("Example-Field", "Foo, Bar"), ("Example-Field", "Baz"))
)
assert response.json["field"] == "Foo, Bar,Baz"

View File

@@ -150,33 +150,47 @@ def test_request_accept():
async def get(request):
return response.empty()
header_value = "text/plain;format=flowed, text/plain, text/*, */*"
request, _ = app.test_client.get(
"/",
headers={
"Accept": "text/*, text/plain, text/plain;format=flowed, */*"
},
headers={"Accept": header_value},
)
assert request.accept == [
assert str(request.accept) == header_value
match = request.accept.match(
"*/*;format=flowed",
"text/plain;format=flowed",
"text/plain",
"text/*",
"*/*",
]
)
assert match == "*/*;format=flowed"
assert match.header.mime == "text/plain"
assert match.header.params == {"format": "flowed"}
header_value = (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
request, _ = app.test_client.get(
"/",
headers={
"Accept": (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
},
headers={"Accept": header_value},
)
assert request.accept == [
assert [str(i) for i in request.accept] == [
"text/html",
"text/x-c",
"text/x-dvi; q=0.8",
"text/plain; q=0.5",
"text/x-dvi;q=0.8",
"text/plain;q=0.5",
]
match = request.accept.match(
"application/json",
"text/plain", # Has lower q in accept header
"text/html;format=flowed", # Params mismatch
"text/*", # Matches
"*/*",
)
assert match == "text/*"
assert match.header.mime == "text/html"
assert match.header.q == 1.0
assert not match.header.params
def test_bad_url_parse():

View File

@@ -514,7 +514,6 @@ def test_file_stream_head_response(
def test_file_stream_response_range(
app: Sanic, file_name, static_file_directory, size, start, end
):
Range = namedtuple("Range", ["size", "start", "end", "total"])
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)

View File

@@ -213,12 +213,3 @@ def test_pop_list(json_app: Sanic):
_, resp = json_app.test_client.get("/json-pop")
assert resp.body == json_dumps(["b"]).encode()
def test_json_response_class_sets_proper_content_type(json_app: Sanic):
@json_app.get("/json-class")
async def handler(request: Request):
return JSONResponse(JSON_BODY)
_, resp = json_app.test_client.get("/json-class")
assert resp.headers["content-type"] == "application/json"

View File

@@ -722,7 +722,6 @@ def test_add_webscoket_route_with_version(app):
def test_route_duplicate(app):
with pytest.raises(RouteExists):
@app.route("/test")
@@ -819,7 +818,6 @@ def test_unquote_add_route(app, unquote):
def test_dynamic_add_route(app):
results = []
async def handler(request, name):
@@ -834,7 +832,6 @@ def test_dynamic_add_route(app):
def test_dynamic_add_route_string(app):
results = []
async def handler(request, name):
@@ -938,7 +935,6 @@ def test_dynamic_add_route_unhashable(app):
def test_add_route_duplicate(app):
with pytest.raises(RouteExists):
async def handler1(request):
@@ -1120,7 +1116,6 @@ def test_route_raise_ParameterNameConflicts(app):
def test_route_invalid_host(app):
host = 321
with pytest.raises(ValueError) as excinfo:

View File

@@ -93,6 +93,7 @@ def test_dont_register_system_signals(app):
@pytest.mark.skipif(os.name == "nt", reason="windows cannot SIGINT processes")
def test_windows_workaround():
"""Test Windows workaround (on any other OS)"""
# At least some code coverage, even though this test doesn't work on
# Windows...
class MockApp:

View File

@@ -1,4 +1,3 @@
import inspect
import logging
import os
import sys
@@ -13,15 +12,6 @@ from sanic import Sanic, text
from sanic.exceptions import FileNotFound
@pytest.fixture(scope="module")
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, "static")
return static_directory
@pytest.fixture(scope="module")
def double_dotted_directory_file(static_file_directory: str):
"""Generate double dotted directory and its files"""
@@ -118,7 +108,12 @@ def test_static_file_pathlib(app, static_file_directory, file_name):
def test_static_file_bytes(app, static_file_directory, file_name):
bsep = os.path.sep.encode("utf-8")
file_path = static_file_directory.encode("utf-8") + bsep + file_name
app.static("/testing.file", file_path)
message = (
"Serving a static directory with a bytes "
"string is deprecated and will be removed in v22.9."
)
with pytest.warns(DeprecationWarning, match=message):
app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
@@ -431,7 +426,6 @@ def test_static_stream_large_file(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_use_modified_since(app, static_file_directory, file_name):
file_stat = os.stat(get_file_path(static_file_directory, file_name))
modified_since = strftime(
"%a, %d %b %Y %H:%M:%S GMT", gmtime(file_stat.st_mtime)

View File

@@ -0,0 +1,123 @@
import os
from pathlib import Path
import pytest
from sanic import Sanic
from sanic.handlers.directory import DirectoryHandler
def get_file_path(static_file_directory, file_name):
return os.path.join(static_file_directory, file_name)
def get_file_content(static_file_directory, file_name):
"""The content of the static file to check"""
with open(get_file_path(static_file_directory, file_name), "rb") as file:
return file.read()
def test_static_directory_view(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, directory_view=True)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
def test_static_index_single(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="test.html")
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_index_single_not_found(app: Sanic, static_file_directory: str):
app.static("/static", static_file_directory, index="index.html")
_, response = app.test_client.get("/static/")
assert response.status == 404
def test_static_index_multiple(app: Sanic, static_file_directory: str):
app.static(
"/static",
static_file_directory,
index=["index.html", "test.html"],
)
_, response = app.test_client.get("/static/")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.html"
)
assert response.headers["Content-Type"] == "text/html"
def test_static_directory_view_and_index(
app: Sanic, static_file_directory: str
):
app.static(
"/static",
static_file_directory,
directory_view=True,
index="foo.txt",
)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler(app: Sanic, static_file_directory: str):
dh = DirectoryHandler(
"/static",
Path(static_file_directory),
directory_view=True,
index="foo.txt",
)
app.static("/static", static_file_directory, directory_handler=dh)
_, response = app.test_client.get("/static/nested/")
assert response.status == 200
assert response.content_type == "text/html; charset=utf-8"
assert "<title>Directory Viewer</title>" in response.text
_, response = app.test_client.get("/static/nested/dir/")
assert response.status == 200
assert response.body == get_file_content(
f"{static_file_directory}/nested/dir", "foo.txt"
)
assert response.content_type == "text/plain"
def test_static_directory_handler_fails(app: Sanic):
dh = DirectoryHandler(
"/static",
Path(""),
directory_view=True,
index="foo.txt",
)
message = (
"When explicitly setting directory_handler, you cannot "
"set either directory_view or index. Instead, pass "
"these arguments to your DirectoryHandler instance."
)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, directory_view=True)
with pytest.raises(ValueError, match=message):
app.static("/static", "", directory_handler=dh, index="index.html")

View File

@@ -654,7 +654,6 @@ def test_sanic_ssl_context_create():
reason="This test requires fork context",
)
def test_ssl_in_multiprocess_mode(app: Sanic, caplog):
ssl_dict = {"cert": localhost_cert, "key": localhost_key}
event = Event()

View File

@@ -176,7 +176,6 @@ def handler(request: Request):
async def client(app: Sanic, loop: AbstractEventLoop):
try:
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
async with httpx.AsyncClient(transport=transport) as client:
r = await client.get("http://myhost.invalid/")

View File

@@ -83,7 +83,6 @@ def test_simple_url_for_getting_with_more_params(app, args, url):
def test_url_for_with_server_name(app):
server_name = f"{test_host}:{test_port}"
app.config.update({"SERVER_NAME": server_name})
path = "/myurl"

View File

@@ -38,7 +38,6 @@ def test_load_module_from_file_location_with_non_existing_env_variable():
LoadFileException,
match="The following environment variables are not set: MuuMilk",
):
load_module_from_file_location("${MuuMilk}")