Merge branch 'main' of github.com:sanic-org/sanic into middleware-revamp

This commit is contained in:
Adam Hopkins 2022-09-15 18:33:22 +03:00
commit c4c39cb082
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
18 changed files with 371 additions and 63 deletions

View File

@ -1,6 +1,7 @@
---
name: Bug report
about: Create a report to help us improve
labels: ["bug"]
---

View File

@ -1,6 +1,7 @@
---
name: Feature request
about: Suggest an idea for Sanic
labels: ["feature request"]
---

View File

@ -458,9 +458,7 @@ class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
def blueprint(
self,
blueprint: Union[
Blueprint, List[Blueprint], Tuple[Blueprint], BlueprintGroup
],
blueprint: Union[Blueprint, Iterable[Blueprint], BlueprintGroup],
**options: Any,
):
"""Register a blueprint on the application.
@ -469,7 +467,7 @@ class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
:param options: option dictionary with blueprint defaults
:return: Nothing
"""
if isinstance(blueprint, (list, tuple, BlueprintGroup)):
if isinstance(blueprint, (Iterable, BlueprintGroup)):
for item in blueprint:
params = {**options}
if isinstance(blueprint, BlueprintGroup):
@ -920,9 +918,19 @@ class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
)
# Run response handler
await self.dispatch(
"http.handler.before",
inline=True,
context={"request": request},
)
response = handler(request, **request.match_info)
if isawaitable(response):
response = await response
await self.dispatch(
"http.handler.after",
inline=True,
context={"request": request},
)
if request.responded:
if response is not None:

View File

@ -308,7 +308,7 @@ class Blueprint(BaseSanic):
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri
uri = self._setup_uri(future.uri, url_prefix)
version_prefix = self.version_prefix
for prefix in (
@ -333,7 +333,7 @@ class Blueprint(BaseSanic):
apply_route = FutureRoute(
future.handler,
uri[1:] if uri.startswith("//") else uri,
uri,
future.methods,
host,
strict_slashes,
@ -363,7 +363,7 @@ class Blueprint(BaseSanic):
# Static Files
for future in self._future_statics:
# Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri
uri = self._setup_uri(future.uri, url_prefix)
apply_route = FutureStatic(uri, *future[1:])
if (self, apply_route) in app._future_registry:
@ -456,6 +456,18 @@ class Blueprint(BaseSanic):
break
return value
@staticmethod
def _setup_uri(base: str, prefix: Optional[str]):
uri = base
if prefix:
uri = prefix
if base.startswith("/") and prefix.endswith("/"):
uri += base[1:]
else:
uri += base
return uri[1:] if uri.startswith("//") else uri
@staticmethod
def register_futures(
apps: Set[Sanic], bp: Blueprint, futures: Sequence[Tuple[Any, ...]]

View File

@ -192,6 +192,7 @@ Or, a path to a directory to run as a simple HTTP server:
ssl = ssl[0]
kwargs = {
"access_log": self.args.access_log,
"coffee": self.args.coffee,
"debug": self.args.debug,
"fast": self.args.fast,
"host": self.args.host,

View File

@ -262,6 +262,12 @@ class OutputGroup(Group):
name = "Output"
def attach(self):
self.add_bool_arguments(
"--coffee",
dest="coffee",
default=False,
help="Uhm, coffee?",
)
self.add_bool_arguments(
"--motd",
dest="motd",

View File

@ -350,6 +350,28 @@ class ErrorHandler:
def _full_lookup(self, exception, route_name: Optional[str] = None):
return self.lookup(exception, route_name)
def _add(
self,
key: Tuple[Type[BaseException], Optional[str]],
handler: RouteHandler,
) -> None:
if key in self.cached_handlers:
exc, name = key
if name is None:
name = "__ALL_ROUTES__"
error_logger.warning(
f"Duplicate exception handler definition on: route={name} "
f"and exception={exc}"
)
deprecation(
"A duplicate exception handler definition was discovered. "
"This may cause unintended consequences. A warning has been "
"issued now, but it will not be allowed starting in v23.3.",
23.3,
)
self.cached_handlers[key] = handler
def add(self, exception, handler, route_names: Optional[List[str]] = None):
"""
Add a new exception handler to an already existing handler object.
@ -365,9 +387,9 @@ class ErrorHandler:
"""
if route_names:
for route in route_names:
self.cached_handlers[(exception, route)] = handler
self._add((exception, route), handler)
else:
self.cached_handlers[(exception, None)] = handler
self._add((exception, None), handler)
def lookup(self, exception, route_name: Optional[str] = None):
"""

View File

@ -958,6 +958,7 @@ class RouteMixin(metaclass=SanicMeta):
# serve from the folder
if not static.resource_type:
if not path.isfile(file_or_directory):
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "dir":
if path.isfile(file_or_directory):
@ -965,6 +966,7 @@ class RouteMixin(metaclass=SanicMeta):
"Resource type improperly identified as directory. "
f"'{file_or_directory}'"
)
uri = uri.rstrip("/")
uri += "/<__file_uri__:path>"
elif static.resource_type == "file" and not path.isfile(
file_or_directory

View File

@ -191,6 +191,7 @@ class RunnerMixin(metaclass=SanicMeta):
fast: bool = False,
verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None,
coffee: bool = False,
auto_tls: bool = False,
) -> None:
if version == 3 and self.state.server_info:
@ -265,6 +266,9 @@ class RunnerMixin(metaclass=SanicMeta):
except AttributeError: # no cov
workers = os.cpu_count() or 1
if coffee:
self.state.coffee = True
server_settings = self._helper(
host=host,
port=port,

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from datetime import datetime
from email.utils import formatdate
from datetime import datetime, timezone
from email.utils import formatdate, parsedate_to_datetime
from functools import partial
from mimetypes import guess_type
from os import path
@ -33,6 +33,7 @@ from sanic.helpers import (
remove_entity_headers,
)
from sanic.http import Http
from sanic.log import logger
from sanic.models.protocol_types import HTMLProtocol, Range
@ -319,9 +320,34 @@ def html(
)
async def validate_file(
request_headers: Header, last_modified: Union[datetime, float, int]
):
try:
if_modified_since = request_headers.getone("If-Modified-Since")
except KeyError:
return
try:
if_modified_since = parsedate_to_datetime(if_modified_since)
except (TypeError, ValueError):
logger.warning(
"Ignorning invalid If-Modified-Since header received: " "'%s'",
if_modified_since,
)
return
if not isinstance(last_modified, datetime):
last_modified = datetime.fromtimestamp(
float(last_modified), tz=timezone.utc
).replace(microsecond=0)
if last_modified <= if_modified_since:
return HTTPResponse(status=304)
async def file(
location: Union[str, PurePath],
status: int = 200,
request_headers: Optional[Header] = None,
validate_when_requested: bool = True,
mime_type: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
filename: Optional[str] = None,
@ -331,7 +357,12 @@ async def file(
_range: Optional[Range] = None,
) -> HTTPResponse:
"""Return a response object with file data.
:param status: HTTP response code. Won't enforce the passed in
status if only a part of the content will be sent (206)
or file is being validated (304).
:param request_headers: The request headers.
:param validate_when_requested: If True, will validate the
file when requested.
:param location: Location of file on system.
:param mime_type: Specific mime_type.
:param headers: Custom Headers.
@ -341,11 +372,6 @@ async def file(
:param no_store: Any cache should not store this response.
:param _range:
"""
headers = headers or {}
if filename:
headers.setdefault(
"Content-Disposition", f'attachment; filename="{filename}"'
)
if isinstance(last_modified, datetime):
last_modified = last_modified.replace(microsecond=0).timestamp()
@ -353,9 +379,24 @@ async def file(
stat = await stat_async(location)
last_modified = stat.st_mtime
if (
validate_when_requested
and request_headers is not None
and last_modified
):
response = await validate_file(request_headers, last_modified)
if response:
return response
headers = headers or {}
if last_modified:
headers.setdefault(
"last-modified", formatdate(last_modified, usegmt=True)
"Last-Modified", formatdate(last_modified, usegmt=True)
)
if filename:
headers.setdefault(
"Content-Disposition", f'attachment; filename="{filename}"'
)
if no_store:

View File

@ -30,6 +30,8 @@ class Event(Enum):
HTTP_LIFECYCLE_RESPONSE = "http.lifecycle.response"
HTTP_ROUTING_AFTER = "http.routing.after"
HTTP_ROUTING_BEFORE = "http.routing.before"
HTTP_HANDLER_AFTER = "http.handler.after"
HTTP_HANDLER_BEFORE = "http.handler.before"
HTTP_LIFECYCLE_SEND = "http.lifecycle.send"
HTTP_MIDDLEWARE_AFTER = "http.middleware.after"
HTTP_MIDDLEWARE_BEFORE = "http.middleware.before"
@ -53,6 +55,8 @@ RESERVED_NAMESPACES = {
Event.HTTP_LIFECYCLE_RESPONSE.value,
Event.HTTP_ROUTING_AFTER.value,
Event.HTTP_ROUTING_BEFORE.value,
Event.HTTP_HANDLER_AFTER.value,
Event.HTTP_HANDLER_BEFORE.value,
Event.HTTP_LIFECYCLE_SEND.value,
Event.HTTP_MIDDLEWARE_AFTER.value,
Event.HTTP_MIDDLEWARE_BEFORE.value,

View File

@ -61,7 +61,7 @@ setup_kwargs = {
"Build fast. Run fast."
),
"long_description": long_description,
"packages": find_packages(),
"packages": find_packages(exclude=("tests", "tests.*")),
"package_data": {"sanic": ["py.typed"]},
"platforms": "any",
"python_requires": ">=3.7",

View File

@ -531,6 +531,8 @@ async def test_signals_triggered(app):
"http.lifecycle.handle",
"http.routing.before",
"http.routing.after",
"http.handler.before",
"http.handler.after",
"http.lifecycle.response",
# "http.lifecycle.send",
# "http.lifecycle.complete",

View File

@ -17,7 +17,7 @@ from sanic.response import json, text
# ------------------------------------------------------------ #
def test_bp(app):
def test_bp(app: Sanic):
bp = Blueprint("test_text")
@bp.route("/")
@ -30,7 +30,7 @@ def test_bp(app):
assert response.text == "Hello"
def test_bp_app_access(app):
def test_bp_app_access(app: Sanic):
bp = Blueprint("test")
with pytest.raises(
@ -87,7 +87,7 @@ def test_versioned_routes_get(app, method):
assert response.status == 200
def test_bp_strict_slash(app):
def test_bp_strict_slash(app: Sanic):
bp = Blueprint("test_text")
@bp.get("/get", strict_slashes=True)
@ -114,7 +114,7 @@ def test_bp_strict_slash(app):
assert response.status == 404
def test_bp_strict_slash_default_value(app):
def test_bp_strict_slash_default_value(app: Sanic):
bp = Blueprint("test_text", strict_slashes=True)
@bp.get("/get")
@ -134,7 +134,7 @@ def test_bp_strict_slash_default_value(app):
assert response.status == 404
def test_bp_strict_slash_without_passing_default_value(app):
def test_bp_strict_slash_without_passing_default_value(app: Sanic):
bp = Blueprint("test_text")
@bp.get("/get")
@ -154,7 +154,7 @@ def test_bp_strict_slash_without_passing_default_value(app):
assert response.text == "OK"
def test_bp_strict_slash_default_value_can_be_overwritten(app):
def test_bp_strict_slash_default_value_can_be_overwritten(app: Sanic):
bp = Blueprint("test_text", strict_slashes=True)
@bp.get("/get", strict_slashes=False)
@ -174,7 +174,7 @@ def test_bp_strict_slash_default_value_can_be_overwritten(app):
assert response.text == "OK"
def test_bp_with_url_prefix(app):
def test_bp_with_url_prefix(app: Sanic):
bp = Blueprint("test_text", url_prefix="/test1")
@bp.route("/")
@ -187,7 +187,7 @@ def test_bp_with_url_prefix(app):
assert response.text == "Hello"
def test_several_bp_with_url_prefix(app):
def test_several_bp_with_url_prefix(app: Sanic):
bp = Blueprint("test_text", url_prefix="/test1")
bp2 = Blueprint("test_text2", url_prefix="/test2")
@ -208,7 +208,7 @@ def test_several_bp_with_url_prefix(app):
assert response.text == "Hello2"
def test_bp_with_host(app):
def test_bp_with_host(app: Sanic):
bp = Blueprint("test_bp_host", url_prefix="/test1", host="example.com")
@bp.route("/")
@ -230,7 +230,7 @@ def test_bp_with_host(app):
assert response.body == b"Hello subdomain!"
def test_several_bp_with_host(app):
def test_several_bp_with_host(app: Sanic):
bp = Blueprint(
"test_text",
url_prefix="/test",
@ -274,7 +274,7 @@ def test_several_bp_with_host(app):
assert response.text == "Hello3"
def test_bp_with_host_list(app):
def test_bp_with_host_list(app: Sanic):
bp = Blueprint(
"test_bp_host",
url_prefix="/test1",
@ -304,7 +304,7 @@ def test_bp_with_host_list(app):
assert response.text == "Hello subdomain!"
def test_several_bp_with_host_list(app):
def test_several_bp_with_host_list(app: Sanic):
bp = Blueprint(
"test_text",
url_prefix="/test",
@ -356,7 +356,7 @@ def test_several_bp_with_host_list(app):
assert response.text == "Hello3"
def test_bp_middleware(app):
def test_bp_middleware(app: Sanic):
blueprint = Blueprint("test_bp_middleware")
@blueprint.middleware("response")
@ -375,7 +375,7 @@ def test_bp_middleware(app):
assert response.text == "FAIL"
def test_bp_middleware_with_route(app):
def test_bp_middleware_with_route(app: Sanic):
blueprint = Blueprint("test_bp_middleware")
@blueprint.middleware("response")
@ -398,7 +398,7 @@ def test_bp_middleware_with_route(app):
assert response.text == "OK"
def test_bp_middleware_order(app):
def test_bp_middleware_order(app: Sanic):
blueprint = Blueprint("test_bp_middleware_order")
order = []
@ -438,7 +438,7 @@ def test_bp_middleware_order(app):
assert order == [1, 2, 3, 4, 5, 6]
def test_bp_exception_handler(app):
def test_bp_exception_handler(app: Sanic):
blueprint = Blueprint("test_middleware")
@blueprint.route("/1")
@ -470,7 +470,7 @@ def test_bp_exception_handler(app):
assert response.status == 200
def test_bp_exception_handler_applied(app):
def test_bp_exception_handler_applied(app: Sanic):
class Error(Exception):
pass
@ -500,7 +500,7 @@ def test_bp_exception_handler_applied(app):
assert response.status == 500
def test_bp_exception_handler_not_applied(app):
def test_bp_exception_handler_not_applied(app: Sanic):
class Error(Exception):
pass
@ -522,7 +522,7 @@ def test_bp_exception_handler_not_applied(app):
assert response.status == 500
def test_bp_listeners(app):
def test_bp_listeners(app: Sanic):
app.route("/")(lambda x: x)
blueprint = Blueprint("test_middleware")
@ -559,7 +559,7 @@ def test_bp_listeners(app):
assert order == [1, 2, 3, 4, 5, 6]
def test_bp_static(app):
def test_bp_static(app: Sanic):
current_file = inspect.getfile(inspect.currentframe())
with open(current_file, "rb") as file:
current_file_contents = file.read()
@ -597,7 +597,7 @@ def test_bp_static_content_type(app, file_name):
assert response.headers["Content-Type"] == "text/html; charset=utf-8"
def test_bp_shorthand(app):
def test_bp_shorthand(app: Sanic):
blueprint = Blueprint("test_shorhand_routes")
ev = asyncio.Event()
@ -682,7 +682,7 @@ def test_bp_shorthand(app):
assert ev.is_set()
def test_bp_group(app):
def test_bp_group(app: Sanic):
deep_0 = Blueprint("deep_0", url_prefix="/deep")
deep_1 = Blueprint("deep_1", url_prefix="/deep1")
@ -722,7 +722,7 @@ def test_bp_group(app):
assert response.text == "D1B_OK"
def test_bp_group_with_default_url_prefix(app):
def test_bp_group_with_default_url_prefix(app: Sanic):
from sanic.response import json
bp_resources = Blueprint("bp_resources")
@ -873,7 +873,7 @@ def test_websocket_route(app: Sanic):
assert event.is_set()
def test_duplicate_blueprint(app):
def test_duplicate_blueprint(app: Sanic):
bp_name = "bp"
bp = Blueprint(bp_name)
bp1 = Blueprint(bp_name)
@ -1056,7 +1056,7 @@ def test_bp_set_attribute_warning():
bp.foo = 1
def test_early_registration(app):
def test_early_registration(app: Sanic):
assert len(app.router.routes) == 0
bp = Blueprint("bp")
@ -1082,3 +1082,29 @@ def test_early_registration(app):
for path in ("one", "two", "three"):
_, response = app.test_client.get(f"/{path}")
assert response.text == path
def test_remove_double_slashes_defined_on_bp(app: Sanic):
bp = Blueprint("bp", url_prefix="/foo/", strict_slashes=True)
@bp.get("/")
async def handler(_):
...
app.blueprint(bp)
app.router.finalize()
assert app.router.routes[0].path == "foo/"
def test_remove_double_slashes_defined_on_register(app: Sanic):
bp = Blueprint("bp")
@bp.get("/")
async def index(_):
...
app.blueprint(bp, url_prefix="/foo/", strict_slashes=True)
app.router.finalize()
assert app.router.routes[0].path == "foo/"

View File

@ -120,7 +120,7 @@ def test_error_with_path_as_instance_without_simple_arg():
),
),
)
def test_tls_options(cmd: Tuple[str]):
def test_tls_options(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode != 1
@ -141,7 +141,7 @@ def test_tls_options(cmd: Tuple[str]):
("--tls-strict-host",),
),
)
def test_tls_wrong_options(cmd: Tuple[str]):
def test_tls_wrong_options(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode == 1
@ -158,7 +158,7 @@ def test_tls_wrong_options(cmd: Tuple[str]):
("-H", "localhost", "-p", "9999"),
),
)
def test_host_port_localhost(cmd: Tuple[str]):
def test_host_port_localhost(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
@ -175,7 +175,7 @@ def test_host_port_localhost(cmd: Tuple[str]):
("-H", "127.0.0.127", "-p", "9999"),
),
)
def test_host_port_ipv4(cmd: Tuple[str]):
def test_host_port_ipv4(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
@ -192,7 +192,7 @@ def test_host_port_ipv4(cmd: Tuple[str]):
("-H", "::", "-p", "9999"),
),
)
def test_host_port_ipv6_any(cmd: Tuple[str]):
def test_host_port_ipv6_any(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
@ -209,7 +209,7 @@ def test_host_port_ipv6_any(cmd: Tuple[str]):
("-H", "::1", "-p", "9999"),
),
)
def test_host_port_ipv6_loopback(cmd: Tuple[str]):
def test_host_port_ipv6_loopback(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
@ -230,7 +230,7 @@ def test_host_port_ipv6_loopback(cmd: Tuple[str]):
(4, ("-w", "4")),
),
)
def test_num_workers(num: int, cmd: Tuple[str]):
def test_num_workers(num: int, cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")

View File

@ -7,7 +7,7 @@ from unittest.mock import Mock
import pytest
from bs4 import BeautifulSoup
from pytest import LogCaptureFixture, MonkeyPatch
from pytest import LogCaptureFixture, MonkeyPatch, WarningsRecorder
from sanic import Sanic, handlers
from sanic.exceptions import BadRequest, Forbidden, NotFound, ServerError
@ -266,3 +266,22 @@ def test_exception_handler_response_was_sent(
_, response = app.test_client.get("/2")
assert "Error" in response.text
def test_warn_on_duplicate(
app: Sanic, caplog: LogCaptureFixture, recwarn: WarningsRecorder
):
@app.exception(ServerError)
async def exception_handler_1(request, exception):
...
@app.exception(ServerError)
async def exception_handler_2(request, exception):
...
assert len(caplog.records) == 1
assert len(recwarn) == 1
assert caplog.records[0].message == (
"Duplicate exception handler definition on: route=__ALL_ROUTES__ and "
"exception=<class 'sanic.exceptions.ServerError'>"
)

36
tests/test_handler.py Normal file
View File

@ -0,0 +1,36 @@
from sanic.app import Sanic
from sanic.response import empty
from sanic.signals import Event
def test_handler_operation_order(app: Sanic):
operations = []
@app.on_request
async def on_request(_):
nonlocal operations
operations.append(1)
@app.on_response
async def on_response(*_):
nonlocal operations
operations.append(5)
@app.get("/")
async def handler(_):
nonlocal operations
operations.append(3)
return empty()
@app.signal(Event.HTTP_HANDLER_BEFORE)
async def handler_before(**_):
nonlocal operations
operations.append(2)
@app.signal(Event.HTTP_HANDLER_AFTER)
async def handler_after(**_):
nonlocal operations
operations.append(4)
app.test_client.get("/")
assert operations == [1, 2, 3, 4, 5]

View File

@ -1,6 +1,7 @@
import asyncio
import inspect
import os
import time
from collections import namedtuple
from datetime import datetime
@ -730,8 +731,10 @@ def test_file_response_headers(
test_expires = test_last_modified.timestamp() + test_max_age
@app.route("/files/cached/<filename>", methods=["GET"])
def file_route_cache(request, filename):
file_path = (Path(static_file_directory) / file_name).absolute()
def file_route_cache(request: Request, filename: str):
file_path = (
Path(static_file_directory) / unquote(filename)
).absolute()
return file(
file_path, max_age=test_max_age, last_modified=test_last_modified
)
@ -739,18 +742,26 @@ def test_file_response_headers(
@app.route(
"/files/cached_default_last_modified/<filename>", methods=["GET"]
)
def file_route_cache_default_last_modified(request, filename):
file_path = (Path(static_file_directory) / file_name).absolute()
def file_route_cache_default_last_modified(
request: Request, filename: str
):
file_path = (
Path(static_file_directory) / unquote(filename)
).absolute()
return file(file_path, max_age=test_max_age)
@app.route("/files/no_cache/<filename>", methods=["GET"])
def file_route_no_cache(request, filename):
file_path = (Path(static_file_directory) / file_name).absolute()
def file_route_no_cache(request: Request, filename: str):
file_path = (
Path(static_file_directory) / unquote(filename)
).absolute()
return file(file_path)
@app.route("/files/no_store/<filename>", methods=["GET"])
def file_route_no_store(request, filename):
file_path = (Path(static_file_directory) / file_name).absolute()
def file_route_no_store(request: Request, filename: str):
file_path = (
Path(static_file_directory) / unquote(filename)
).absolute()
return file(file_path, no_store=True)
_, response = app.test_client.get(f"/files/cached/{file_name}")
@ -767,11 +778,11 @@ def test_file_response_headers(
== formatdate(test_expires, usegmt=True)[:-6]
# [:-6] to allow at most 1 min difference
# It's minimal for cases like:
# Thu, 26 May 2022 05:36:49 GMT
# Thu, 26 May 2022 05:36:59 GMT
# AND
# Thu, 26 May 2022 05:36:50 GMT
# Thu, 26 May 2022 05:37:00 GMT
)
assert response.status == 200
assert "last-modified" in headers and headers.get(
"last-modified"
) == formatdate(test_last_modified.timestamp(), usegmt=True)
@ -786,15 +797,127 @@ def test_file_response_headers(
assert "last-modified" in headers and headers.get(
"last-modified"
) == formatdate(file_last_modified, usegmt=True)
assert response.status == 200
_, response = app.test_client.get(f"/files/no_cache/{file_name}")
headers = response.headers
assert "cache-control" in headers and f"no-cache" == headers.get(
"cache-control"
)
assert response.status == 200
_, response = app.test_client.get(f"/files/no_store/{file_name}")
headers = response.headers
assert "cache-control" in headers and f"no-store" == headers.get(
"cache-control"
)
assert response.status == 200
def test_file_validate(app: Sanic, static_file_directory: str):
file_name = "test_validate.txt"
static_file_directory = Path(static_file_directory)
file_path = static_file_directory / file_name
file_path = file_path.absolute()
test_max_age = 10
with open(file_path, "w+") as f:
f.write("foo\n")
@app.route("/validate", methods=["GET"])
def file_route_cache(request: Request):
return file(
file_path,
request_headers=request.headers,
max_age=test_max_age,
validate_when_requested=True,
)
_, response = app.test_client.get("/validate")
assert response.status == 200
assert response.body == b"foo\n"
last_modified = response.headers["Last-Modified"]
time.sleep(1)
with open(file_path, "a") as f:
f.write("bar\n")
_, response = app.test_client.get(
"/validate", headers={"If-Modified-Since": last_modified}
)
assert response.status == 200
assert response.body == b"foo\nbar\n"
last_modified = response.headers["Last-Modified"]
_, response = app.test_client.get(
"/validate", headers={"if-modified-since": last_modified}
)
assert response.status == 304
assert response.body == b""
file_path.unlink()
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_file_validating_invalid_header(
app: Sanic, file_name: str, static_file_directory: str
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request: Request, filename: str):
handler_file_path = (
Path(static_file_directory) / unquote(filename)
).absolute()
return file(
handler_file_path,
request_headers=request.headers,
validate_when_requested=True,
)
_, response = app.test_client.get(f"/files/{file_name}")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
_, response = app.test_client.get(
f"/files/{file_name}", headers={"if-modified-since": "invalid-value"}
)
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
_, response = app.test_client.get(
f"/files/{file_name}", headers={"if-modified-since": ""}
)
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_file_validating_304_response(
app: Sanic, file_name: str, static_file_directory: str
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request: Request, filename: str):
handler_file_path = (
Path(static_file_directory) / unquote(filename)
).absolute()
return file(
handler_file_path,
request_headers=request.headers,
validate_when_requested=True,
)
_, response = app.test_client.get(f"/files/{file_name}")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
_, response = app.test_client.get(
f"/files/{file_name}",
headers={"if-modified-since": response.headers["Last-Modified"]},
)
assert response.status == 304
assert response.body == b""