Merge in main to current-release (#2254)

* Remove unnecessary import in test_constants.py, which also fixes an error on win (#2180)

Co-authored-by: Adam Hopkins <admhpkns@gmail.com>

* Manually reset the buffer when streaming request body (#2183)

* Remove Duplicated Dependencies and PEP 517 Support (#2173)

* Remove duplicated dependencies

* Specify setuptools as the tool for generating distribution (PEP 517)

* Add `isort` to `dev_require`

* manage all dependencies in setup.py

* Execute `make pretty`

* Set usedevelop to true (revert previous change)

* Fix the handling of the end of a chunked request. (#2188)

* Fix the handling of the end of a chunked request.

* Avoid hardcoding final chunk header size.

* Add some unit tests for pipeline body reading

* Decode bytes for json serialization

Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Resolve regressions in exceptions (#2181)

* Update sanic-routing to fix path issues plus lookahead / lookbehind support (#2178)

* Update sanic-routing to fix path issues plus lookahead / lookbehind support

* Update setup.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>

* style(app,blueprints): add some type hints (#2196)

* style(app,blueprints): add some type hints

* style(app): option is Any

* style(blueprints): url prefix default value is ``""``

* style(app): backward compatible

* style(app): backward compatible

* style(blueprints): defult is None

* style(app): apply code style (black)

* Update some CC config (#2199)

* Update README.rst

* raise exception for `_static_request_handler` unknown exception; add test with custom error (#2195)

Co-authored-by: n.feofanov <n.feofanov@visionlabs.ru>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>

* Change dumps to AnyStr (#2193)

* HTTP tests (#2194)

* Fix issues with after request handling in HTTP pipelining (#2201)

* Clean up after a request is complete, before the next pipelined request.

* Limit the size of request body consumed after handler has finished.

* Linter error.

* Add unit test re: bad headers

Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>
Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Update CHANGELOG

* Log remote address if available (#2207)

* Log remote address if available

* Add tests

* Fix testing version

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Fixed for handling exceptions of asgi app call. (#2211)

@cansarigol3megawatt Thanks for looking into this and getting the quick turnaround on this. I will 🍒 pick this into the 21.6 branch and get it out a little later tonight.

* Signals Integration (#2160)

* Update some tests

* Resolve #2122 route decorator returning tuple

* Use rc sanic-routing version

* Update unit tests to <:str>

* Minimal working version with some signals implemented

* Add more http signals

* Update ASGI and change listeners to signals

* Allow for dynamic ODE signals

* Allow signals to be stacked

* Begin tests

* Prioritize match_info on keyword argument injection

* WIP on tests

* Compat with signals

* Work through some test coverage

* Passing tests

* Post linting

* Setup proper resets

* coverage reporting

* Fixes from vltr comments

* clear delayed tasks

* Fix bad test

* rm pycache

* uncomment windows tests (#2214)

* Add convenience methods to BP groups (#2209)

* Fix bug where ws exceptions not being logged (#2213)

* Fix bug where ws exceptions not being logged

* Fix t\est

* Style: add type hints (#2217)

* style(routes): add_route argument, return typing

* style(listeners): typing

* style(views): typing as_view

* style(routes): change type hint

* style(listeners): change type hint

* style(routes): change type hint

* add some more types

* Change as_view typing

* Add some cleaner type annotations

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Add default messages to SanicExceptions (#2216)

* Add default messages to SanicExceptions

* Cleaner exception message setting

* Copy Blueprints Implementation (#2184)

* Accept header parsing (#2200)

* Add some tests

* docstring

* Add accept matching

* Add some more tests on matching

* Add matching flags for wildcards

* Add mathing controls to accept

* Limit uvicorn 14 in testing

* Add convenience for annotated handlers (#2225)

* Split HttpProtocol parts into base SanicProtocol and HTTPProtocol subclass (#2229)

* Split HttpProtocol parts into base SanicProtocol and HTTPProtocol subclass.

* lint fixes

* re-black server.py

* Move server.py into its own module (#2230)

* Move server.py into its own module

* Change monkeypatch path on test_logging.py

* Blueprint specific exception handlers (#2208)

* Call abort() on sockets after close() to prevent dangling sockets (#2231)

* Add ability to return Falsey but not-None from handlers (#2236)

* Adds Blueprint Group exception decorator (#2238)

* Add exception decorator

* Added tests

* Fix line too long

* Static DIR and FILE resource types (#2244)

* Explicit static directive for serving file or dir


Co-authored-by: anbuhckr <36891836+anbuhckr@users.noreply.github.com>
Co-authored-by: anbuhckr <miki.suhendra@gmail.com>

* Close HTTP loop when connection task cancelled (#2245)

* Terminate loop when no transport exists

* Add log when closing HTTP loop because of shutdown

* Add unit test

* New websockets (#2158)

* First attempt at new Websockets implementation based on websockets >= 9.0, with sans-i/o features. Requires more work.

* Update sanic/websocket.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Update sanic/websocket.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Update sanic/websocket.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* wip, update websockets code to new Sans/IO API

* Refactored new websockets impl into own modules
Incorporated other suggestions made by team

* Another round of work on the new websockets impl
* Added websocket_timeout support (matching previous/legacy support)
* Lots more comments
* Incorporated suggested changes from previous round of review
* Changed RuntimeError usage to ServerError
* Changed SanicException usage to ServerError
* Removed some redundant asserts
* Change remaining asserts to ServerErrors
* Fixed some timeout handling issues
* Fixed websocket.close() handling, and made it more robust
* Made auto_close task smarter and more error-resilient
* Made fail_connection routine smarter and more error-resilient

* Further new websockets impl fixes
* Update compatibility with Websockets v10
* Track server connection state in a more precise way
* Try to handle the shutdown process more gracefully
* Add a new end_connection() helper, to use as an alterative to close() or fail_connection()
* Kill the auto-close task and keepalive-timeout task when sanic is shutdown
* Deprecate WEBSOCKET_READ_LIMIT and WEBSOCKET_WRITE_LIMIT configs, they are not used in this implementation.

* Change a warning message to debug level
Remove default values for deprecated websocket parameters

* Fix flake8 errors

* Fix a couple of missed failing tests

* remove websocket bench from examples

* Integrate suggestions from code reviews
Use Optional[T] instead of union[T,None]
Fix mypy type logic errors
change "is not None" to truthy checks where appropriate
change "is None" to falsy checks were appropriate
Add more debug logging when debug mode is on
Change to using sanic.logger for debug logging rather than error_logger.

* Fix long line lengths of debug messages
Add some new debug messages when websocket IO is paused and unpaused for flow control
Fix websocket example to use app.static()

* remove unused import in websocket example app

* re-run isort after Flake8 fixes

Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>

* Account for BP with exception handler but no routes (#2246)

* Don't log "enabled" if auto-reload disabled (#2247)

Fixes #2240

Co-authored-by: Adam Hopkins <admhpkns@gmail.com>

* Smarter auto fallback (#2162)

* Smarter auto fallback

* remove config from blueprints

* Add tests for error formatting

* Add check for proper format

* Fix some tests

* Add some tests

* docstring

* Add accept matching

* Add some more tests on matching

* Fix contains bug, earlier return on MediaType eq

* Add matching flags for wildcards

* Add mathing controls to accept

* Cleanup dev cruft

* Add cleanup and resolve OSError relating to test implementation

* Fix test

* Fix some typos

* Some fixes to the new Websockets impl (#2248)

* First attempt at new Websockets implementation based on websockets >= 9.0, with sans-i/o features. Requires more work.

* Update sanic/websocket.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Update sanic/websocket.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* Update sanic/websocket.py

Co-authored-by: Adam Hopkins <adam@amhopkins.com>

* wip, update websockets code to new Sans/IO API

* Refactored new websockets impl into own modules
Incorporated other suggestions made by team

* Another round of work on the new websockets impl
* Added websocket_timeout support (matching previous/legacy support)
* Lots more comments
* Incorporated suggested changes from previous round of review
* Changed RuntimeError usage to ServerError
* Changed SanicException usage to ServerError
* Removed some redundant asserts
* Change remaining asserts to ServerErrors
* Fixed some timeout handling issues
* Fixed websocket.close() handling, and made it more robust
* Made auto_close task smarter and more error-resilient
* Made fail_connection routine smarter and more error-resilient

* Further new websockets impl fixes
* Update compatibility with Websockets v10
* Track server connection state in a more precise way
* Try to handle the shutdown process more gracefully
* Add a new end_connection() helper, to use as an alterative to close() or fail_connection()
* Kill the auto-close task and keepalive-timeout task when sanic is shutdown
* Deprecate WEBSOCKET_READ_LIMIT and WEBSOCKET_WRITE_LIMIT configs, they are not used in this implementation.

* Change a warning message to debug level
Remove default values for deprecated websocket parameters

* Fix flake8 errors

* Fix a couple of missed failing tests

* remove websocket bench from examples

* Integrate suggestions from code reviews
Use Optional[T] instead of union[T,None]
Fix mypy type logic errors
change "is not None" to truthy checks where appropriate
change "is None" to falsy checks were appropriate
Add more debug logging when debug mode is on
Change to using sanic.logger for debug logging rather than error_logger.

* Fix long line lengths of debug messages
Add some new debug messages when websocket IO is paused and unpaused for flow control
Fix websocket example to use app.static()

* remove unused import in websocket example app

* re-run isort after Flake8 fixes

* Some fixes to the new Websockets impl
Will throw WebsocketClosed exception instead of ServerException now when attempting to read or write to closed websocket, this makes it easier to catch
The various ws.recv() methods now have the ability to raise CancelledError into your websocket handler
Fix a niche close-socket negotiation bug
Fix bug where http protocol thought the websocket never sent any response.
Allow data to still send in some cases after websocket enters CLOSING state.
Fix some badly formatted and badly placed comments

* allow eof_received to send back data too, if the connection is in CLOSING state

Co-authored-by: Adam Hopkins <adam@amhopkins.com>
Co-authored-by: Adam Hopkins <admhpkns@gmail.com>

* 21.9 release docs (#2218)

* Beging 21.9 release docs

* Add PRs to changelog

* Change deprecation version

* Update logging tests

* Bump version

* Update changelog

* Change dev install command (#2251)

Co-authored-by: Zhiwei <zhi.wei.liang@outlook.com>
Co-authored-by: L. Kärkkäinen <98187+Tronic@users.noreply.github.com>
Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com>
Co-authored-by: Robert Palmer <robd003@users.noreply.github.com>
Co-authored-by: Ryu JuHeon <saidbysolo@gmail.com>
Co-authored-by: gluhar2006 <49654448+gluhar2006@users.noreply.github.com>
Co-authored-by: n.feofanov <n.feofanov@visionlabs.ru>
Co-authored-by: Néstor Pérez <25409753+prryplatypus@users.noreply.github.com>
Co-authored-by: Can Sarigol <56863826+cansarigol3megawatt@users.noreply.github.com>
Co-authored-by: Zhiwei <chihwei.public@outlook.com>
Co-authored-by: YongChan Cho <h3236516@gmail.com>
Co-authored-by: Zhiwei <zhiwei@sinatra.ai>
Co-authored-by: Ashley Sommer <ashleysommer@gmail.com>
Co-authored-by: anbuhckr <36891836+anbuhckr@users.noreply.github.com>
Co-authored-by: anbuhckr <miki.suhendra@gmail.com>
This commit is contained in:
Adam Hopkins
2021-10-02 21:55:23 +03:00
committed by GitHub
parent 5308fec354
commit bc08383acd
92 changed files with 5109 additions and 1888 deletions

View File

@@ -1,3 +1,5 @@
import asyncio
import logging
import random
import re
import string
@@ -9,10 +11,12 @@ from typing import Tuple
import pytest
from sanic_routing.exceptions import RouteExists
from sanic_testing.testing import PORT
from sanic import Sanic
from sanic.constants import HTTP_METHODS
from sanic.router import Router
from sanic.touchup.service import TouchUp
slugify = re.compile(r"[^a-zA-Z0-9_\-]")
@@ -23,11 +27,6 @@ if sys.platform in ["win32", "cygwin"]:
collect_ignore = ["test_worker.py"]
@pytest.fixture
def caplog(caplog):
yield caplog
async def _handler(request):
"""
Dummy placeholder method used for route resolver when creating a new
@@ -41,33 +40,32 @@ async def _handler(request):
TYPE_TO_GENERATOR_MAP = {
"string": lambda: "".join(
"str": lambda: "".join(
[random.choice(string.ascii_lowercase) for _ in range(4)]
),
"int": lambda: random.choice(range(1000000)),
"number": lambda: random.random(),
"float": lambda: random.random(),
"alpha": lambda: "".join(
[random.choice(string.ascii_lowercase) for _ in range(4)]
),
"uuid": lambda: str(uuid.uuid1()),
}
CACHE = {}
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = HTTP_METHODS
ROUTE_PARAM_TYPES = ["string", "int", "number", "alpha", "uuid"]
ROUTE_PARAM_TYPES = ["str", "int", "float", "alpha", "uuid"]
def generate_random_direct_route(self, max_route_depth=4):
routes = []
for depth in range(1, max_route_depth + 1):
for _ in range(self.ROUTE_COUNT_PER_DEPTH):
route = "/".join(
[
TYPE_TO_GENERATOR_MAP.get("string")()
for _ in range(depth)
]
[TYPE_TO_GENERATOR_MAP.get("str")() for _ in range(depth)]
)
route = route.replace(".", "", -1)
route_detail = (random.choice(self.HTTP_METHODS), route)
@@ -83,7 +81,7 @@ class RouteStringGenerator:
new_route_part = "/".join(
[
"<{}:{}>".format(
TYPE_TO_GENERATOR_MAP.get("string")(),
TYPE_TO_GENERATOR_MAP.get("str")(),
random.choice(self.ROUTE_PARAM_TYPES),
)
for _ in range(max_route_depth - current_length)
@@ -98,7 +96,7 @@ class RouteStringGenerator:
def generate_url_for_template(template):
url = template
for pattern, param_type in re.findall(
re.compile(r"((?:<\w+:(string|int|number|alpha|uuid)>)+)"),
re.compile(r"((?:<\w+:(str|int|float|alpha|uuid)>)+)"),
template,
):
value = TYPE_TO_GENERATOR_MAP.get(param_type)()
@@ -111,6 +109,7 @@ def sanic_router(app):
# noinspection PyProtectedMember
def _setup(route_details: tuple) -> Tuple[Router, tuple]:
router = Router()
router.ctx.app = app
added_router = []
for method, route in route_details:
try:
@@ -141,5 +140,33 @@ def url_param_generator():
@pytest.fixture(scope="function")
def app(request):
if not CACHE:
for target, method_name in TouchUp._registry:
CACHE[method_name] = getattr(target, method_name)
app = Sanic(slugify.sub("-", request.node.name))
return app
yield app
for target, method_name in TouchUp._registry:
setattr(target, method_name, CACHE[method_name])
@pytest.fixture(scope="function")
def run_startup(caplog):
def run(app):
nonlocal caplog
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with caplog.at_level(logging.DEBUG):
server = app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
loop._stopping = False
_server = loop.run_until_complete(server)
_server.close()
loop.run_until_complete(_server.wait_closed())
app.stop()
return caplog.record_tuples
return run

View File

@@ -178,9 +178,6 @@ def test_app_enable_websocket(app, websocket_enabled, enable):
@patch("sanic.app.WebSocketProtocol")
def test_app_websocket_parameters(websocket_protocol_mock, app):
app.config.WEBSOCKET_MAX_SIZE = 44
app.config.WEBSOCKET_MAX_QUEUE = 45
app.config.WEBSOCKET_READ_LIMIT = 46
app.config.WEBSOCKET_WRITE_LIMIT = 47
app.config.WEBSOCKET_PING_TIMEOUT = 48
app.config.WEBSOCKET_PING_INTERVAL = 50
@@ -197,11 +194,6 @@ def test_app_websocket_parameters(websocket_protocol_mock, app):
websocket_protocol_call_args = websocket_protocol_mock.call_args
ws_kwargs = websocket_protocol_call_args[1]
assert ws_kwargs["websocket_max_size"] == app.config.WEBSOCKET_MAX_SIZE
assert ws_kwargs["websocket_max_queue"] == app.config.WEBSOCKET_MAX_QUEUE
assert ws_kwargs["websocket_read_limit"] == app.config.WEBSOCKET_READ_LIMIT
assert (
ws_kwargs["websocket_write_limit"] == app.config.WEBSOCKET_WRITE_LIMIT
)
assert (
ws_kwargs["websocket_ping_timeout"]
== app.config.WEBSOCKET_PING_TIMEOUT
@@ -396,7 +388,7 @@ def test_app_set_attribute_warning(app):
assert len(record) == 1
assert record[0].message.args[0] == (
"Setting variables on Sanic instances is deprecated "
"and will be removed in version 21.9. You should change your "
"and will be removed in version 21.12. You should change your "
"Sanic instance to use instance.ctx.foo instead."
)

View File

@@ -10,7 +10,7 @@ from sanic.asgi import MockTransport
from sanic.exceptions import Forbidden, InvalidUsage, ServiceUnavailable
from sanic.request import Request
from sanic.response import json, text
from sanic.websocket import WebSocketConnection
from sanic.server.websockets.connection import WebSocketConnection
@pytest.fixture
@@ -360,6 +360,7 @@ async def test_request_handle_exception(app):
_, response = await app.asgi_client.get("/error-prone")
assert response.status_code == 503
@pytest.mark.asyncio
async def test_request_exception_suppressed_by_middleware(app):
@app.get("/error-prone")
@@ -374,4 +375,4 @@ async def test_request_exception_suppressed_by_middleware(app):
assert response.status_code == 403
_, response = await app.asgi_client.get("/error-prone")
assert response.status_code == 403
assert response.status_code == 403

View File

@@ -20,4 +20,4 @@ def test_bad_request_response(app):
app.run(host="127.0.0.1", port=42101, debug=False)
assert lines[0] == b"HTTP/1.1 400 Bad Request\r\n"
assert b"Bad Request" in lines[-1]
assert b"Bad Request" in lines[-2]

View File

@@ -0,0 +1,70 @@
from copy import deepcopy
from sanic import Blueprint, Sanic, blueprints, response
from sanic.response import text
def test_bp_copy(app: Sanic):
bp1 = Blueprint("test_bp1", version=1)
bp1.ctx.test = 1
assert hasattr(bp1.ctx, "test")
@bp1.route("/page")
def handle_request(request):
return text("Hello world!")
bp2 = bp1.copy(name="test_bp2", version=2)
assert id(bp1) != id(bp2)
assert bp1._apps == bp2._apps == set()
assert not hasattr(bp2.ctx, "test")
assert len(bp2._future_exceptions) == len(bp1._future_exceptions)
assert len(bp2._future_listeners) == len(bp1._future_listeners)
assert len(bp2._future_middleware) == len(bp1._future_middleware)
assert len(bp2._future_routes) == len(bp1._future_routes)
assert len(bp2._future_signals) == len(bp1._future_signals)
app.blueprint(bp1)
app.blueprint(bp2)
bp3 = bp1.copy(name="test_bp3", version=3, with_registration=True)
assert id(bp1) != id(bp3)
assert bp1._apps == bp3._apps and bp3._apps
assert not hasattr(bp3.ctx, "test")
bp4 = bp1.copy(name="test_bp4", version=4, with_ctx=True)
assert id(bp1) != id(bp4)
assert bp4.ctx.test == 1
bp5 = bp1.copy(name="test_bp5", version=5, with_registration=False)
assert id(bp1) != id(bp5)
assert not bp5._apps
assert bp1._apps != set()
app.blueprint(bp5)
bp6 = bp1.copy(
name="test_bp6",
version=6,
with_registration=True,
version_prefix="/version",
)
assert bp6._apps
assert bp6.version_prefix == "/version"
_, response = app.test_client.get("/v1/page")
assert "Hello world!" in response.text
_, response = app.test_client.get("/v2/page")
assert "Hello world!" in response.text
_, response = app.test_client.get("/v3/page")
assert "Hello world!" in response.text
_, response = app.test_client.get("/v4/page")
assert "Hello world!" in response.text
_, response = app.test_client.get("/v5/page")
assert "Hello world!" in response.text
_, response = app.test_client.get("/version6/page")
assert "Hello world!" in response.text

View File

@@ -3,6 +3,12 @@ from pytest import raises
from sanic.app import Sanic
from sanic.blueprint_group import BlueprintGroup
from sanic.blueprints import Blueprint
from sanic.exceptions import (
Forbidden,
InvalidUsage,
SanicException,
ServerError,
)
from sanic.request import Request
from sanic.response import HTTPResponse, text
@@ -96,16 +102,28 @@ def test_bp_group(app: Sanic):
def blueprint_1_default_route(request):
return text("BP1_OK")
@blueprint_1.route("/invalid")
def blueprint_1_error(request: Request):
raise InvalidUsage("Invalid")
@blueprint_2.route("/")
def blueprint_2_default_route(request):
return text("BP2_OK")
@blueprint_2.route("/error")
def blueprint_2_error(request: Request):
raise ServerError("Error")
blueprint_group_1 = Blueprint.group(
blueprint_1, blueprint_2, url_prefix="/bp"
)
blueprint_3 = Blueprint("blueprint_3", url_prefix="/bp3")
@blueprint_group_1.exception(InvalidUsage)
def handle_group_exception(request, exception):
return text("BP1_ERR_OK")
@blueprint_group_1.middleware("request")
def blueprint_group_1_middleware(request):
global MIDDLEWARE_INVOKE_COUNTER
@@ -116,19 +134,47 @@ def test_bp_group(app: Sanic):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["request"] += 1
@blueprint_group_1.on_request
def blueprint_group_1_convenience_1(request):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["request"] += 1
@blueprint_group_1.on_request()
def blueprint_group_1_convenience_2(request):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["request"] += 1
@blueprint_3.route("/")
def blueprint_3_default_route(request):
return text("BP3_OK")
@blueprint_3.route("/forbidden")
def blueprint_3_forbidden(request: Request):
raise Forbidden("Forbidden")
blueprint_group_2 = Blueprint.group(
blueprint_group_1, blueprint_3, url_prefix="/api"
)
@blueprint_group_2.exception(SanicException)
def handle_non_handled_exception(request, exception):
return text("BP2_ERR_OK")
@blueprint_group_2.middleware("response")
def blueprint_group_2_middleware(request, response):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["response"] += 1
@blueprint_group_2.on_response
def blueprint_group_2_middleware_convenience_1(request, response):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["response"] += 1
@blueprint_group_2.on_response()
def blueprint_group_2_middleware_convenience_2(request, response):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["response"] += 1
app.blueprint(blueprint_group_2)
@app.route("/")
@@ -141,14 +187,23 @@ def test_bp_group(app: Sanic):
_, response = app.test_client.get("/api/bp/bp1")
assert response.text == "BP1_OK"
_, response = app.test_client.get("/api/bp/bp1/invalid")
assert response.text == "BP1_ERR_OK"
_, response = app.test_client.get("/api/bp/bp2")
assert response.text == "BP2_OK"
_, response = app.test_client.get("/api/bp/bp2/error")
assert response.text == "BP2_ERR_OK"
_, response = app.test_client.get("/api/bp3")
assert response.text == "BP3_OK"
assert MIDDLEWARE_INVOKE_COUNTER["response"] == 3
assert MIDDLEWARE_INVOKE_COUNTER["request"] == 4
_, response = app.test_client.get("/api/bp3/forbidden")
assert response.text == "BP2_ERR_OK"
assert MIDDLEWARE_INVOKE_COUNTER["response"] == 18
assert MIDDLEWARE_INVOKE_COUNTER["request"] == 16
def test_bp_group_list_operations(app: Sanic):

View File

@@ -83,7 +83,6 @@ def test_versioned_routes_get(app, method):
return text("OK")
else:
print(func)
raise Exception(f"{func} is not callable")
app.blueprint(bp)
@@ -477,6 +476,58 @@ def test_bp_exception_handler(app):
assert response.status == 200
def test_bp_exception_handler_applied(app):
class Error(Exception):
pass
handled = Blueprint("handled")
nothandled = Blueprint("nothandled")
@handled.exception(Error)
def handle_error(req, e):
return text("handled {}".format(e))
@handled.route("/ok")
def ok(request):
raise Error("uh oh")
@nothandled.route("/notok")
def notok(request):
raise Error("uh oh")
app.blueprint(handled)
app.blueprint(nothandled)
_, response = app.test_client.get("/ok")
assert response.status == 200
assert response.text == "handled uh oh"
_, response = app.test_client.get("/notok")
assert response.status == 500
def test_bp_exception_handler_not_applied(app):
class Error(Exception):
pass
handled = Blueprint("handled")
nothandled = Blueprint("nothandled")
@handled.exception(Error)
def handle_error(req, e):
return text("handled {}".format(e))
@nothandled.route("/notok")
def notok(request):
raise Error("uh oh")
app.blueprint(handled)
app.blueprint(nothandled)
_, response = app.test_client.get("/notok")
assert response.status == 500
def test_bp_listeners(app):
app.route("/")(lambda x: x)
blueprint = Blueprint("test_middleware")
@@ -1034,6 +1085,6 @@ def test_bp_set_attribute_warning():
assert len(record) == 1
assert record[0].message.args[0] == (
"Setting variables on Blueprint instances is deprecated "
"and will be removed in version 21.9. You should change your "
"and will be removed in version 21.12. You should change your "
"Blueprint instance to use instance.ctx.foo instead."
)

View File

@@ -89,7 +89,7 @@ def test_debug(cmd):
out, err, exitcode = capture(command)
lines = out.split(b"\n")
app_info = lines[9]
app_info = lines[26]
info = json.loads(app_info)
assert (b"\n".join(lines[:6])).decode("utf-8") == BASE_LOGO
@@ -103,7 +103,7 @@ def test_auto_reload(cmd):
out, err, exitcode = capture(command)
lines = out.split(b"\n")
app_info = lines[9]
app_info = lines[26]
info = json.loads(app_info)
assert info["debug"] is False
@@ -118,7 +118,7 @@ def test_access_logs(cmd, expected):
out, err, exitcode = capture(command)
lines = out.split(b"\n")
app_info = lines[9]
app_info = lines[26]
info = json.loads(app_info)
assert info["access_log"] is expected

View File

@@ -13,7 +13,7 @@ from sanic.exceptions import PyFileError
@contextmanager
def temp_path():
""" a simple cross platform replacement for NamedTemporaryFile """
"""a simple cross platform replacement for NamedTemporaryFile"""
with TemporaryDirectory() as td:
yield Path(td, "file")

View File

@@ -1,6 +1,4 @@
from crypt import methods
from sanic import text
from sanic import Sanic, text
from sanic.constants import HTTP_METHODS, HTTPMethod
@@ -14,7 +12,7 @@ def test_string_compat():
assert HTTPMethod.GET.upper() == "GET"
def test_use_in_routes(app):
def test_use_in_routes(app: Sanic):
@app.route("/", methods=[HTTPMethod.GET, HTTPMethod.POST])
def handler(_):
return text("It works")

View File

@@ -1,6 +1,5 @@
import asyncio
from queue import Queue
from threading import Event
from sanic.response import text
@@ -13,8 +12,6 @@ def test_create_task(app):
await asyncio.sleep(0.05)
e.set()
app.add_task(coro)
@app.route("/early")
def not_set(request):
return text(str(e.is_set()))
@@ -24,24 +21,30 @@ def test_create_task(app):
await asyncio.sleep(0.1)
return text(str(e.is_set()))
app.add_task(coro)
request, response = app.test_client.get("/early")
assert response.body == b"False"
app.signal_router.reset()
app.add_task(coro)
request, response = app.test_client.get("/late")
assert response.body == b"True"
def test_create_task_with_app_arg(app):
q = Queue()
@app.after_server_start
async def setup_q(app, _):
app.ctx.q = asyncio.Queue()
@app.route("/")
def not_set(request):
return "hello"
async def not_set(request):
return text(await request.app.ctx.q.get())
async def coro(app):
q.put(app.name)
await app.ctx.q.put(app.name)
app.add_task(coro)
request, response = app.test_client.get("/")
assert q.get() == "test_create_task_with_app_arg"
_, response = app.test_client.get("/")
assert response.text == "test_create_task_with_app_arg"

View File

@@ -1,10 +1,10 @@
import pytest
from sanic import Sanic
from sanic.errorpages import exception_response
from sanic.exceptions import NotFound
from sanic.errorpages import HTMLRenderer, exception_response
from sanic.exceptions import NotFound, SanicException
from sanic.request import Request
from sanic.response import HTTPResponse
from sanic.response import HTTPResponse, html, json, text
@pytest.fixture
@@ -20,7 +20,7 @@ def app():
@pytest.fixture
def fake_request(app):
return Request(b"/foobar", {}, "1.1", "GET", None, app)
return Request(b"/foobar", {"accept": "*/*"}, "1.1", "GET", None, app)
@pytest.mark.parametrize(
@@ -47,7 +47,13 @@ def test_should_return_html_valid_setting(
try:
raise exception("bad stuff")
except Exception as e:
response = exception_response(fake_request, e, True)
response = exception_response(
fake_request,
e,
True,
base=HTMLRenderer,
fallback=fake_request.app.config.FALLBACK_ERROR_FORMAT,
)
assert isinstance(response, HTTPResponse)
assert response.status == status
@@ -74,13 +80,194 @@ def test_auto_fallback_with_content_type(app):
app.config.FALLBACK_ERROR_FORMAT = "auto"
_, response = app.test_client.get(
"/error", headers={"content-type": "application/json"}
"/error", headers={"content-type": "application/json", "accept": "*/*"}
)
assert response.status == 500
assert response.content_type == "application/json"
_, response = app.test_client.get(
"/error", headers={"content-type": "text/plain"}
"/error", headers={"content-type": "foo/bar", "accept": "*/*"}
)
assert response.status == 500
assert response.content_type == "text/html; charset=utf-8"
def test_route_error_format_set_on_auto(app):
@app.get("/text")
def text_response(request):
return text(request.route.ctx.error_format)
@app.get("/json")
def json_response(request):
return json({"format": request.route.ctx.error_format})
@app.get("/html")
def html_response(request):
return html(request.route.ctx.error_format)
_, response = app.test_client.get("/text")
assert response.text == "text"
_, response = app.test_client.get("/json")
assert response.json["format"] == "json"
_, response = app.test_client.get("/html")
assert response.text == "html"
def test_route_error_response_from_auto_route(app):
@app.get("/text")
def text_response(request):
raise Exception("oops")
return text("Never gonna see this")
@app.get("/json")
def json_response(request):
raise Exception("oops")
return json({"message": "Never gonna see this"})
@app.get("/html")
def html_response(request):
raise Exception("oops")
return html("<h1>Never gonna see this</h1>")
_, response = app.test_client.get("/text")
assert response.content_type == "text/plain; charset=utf-8"
_, response = app.test_client.get("/json")
assert response.content_type == "application/json"
_, response = app.test_client.get("/html")
assert response.content_type == "text/html; charset=utf-8"
def test_route_error_response_from_explicit_format(app):
@app.get("/text", error_format="json")
def text_response(request):
raise Exception("oops")
return text("Never gonna see this")
@app.get("/json", error_format="text")
def json_response(request):
raise Exception("oops")
return json({"message": "Never gonna see this"})
_, response = app.test_client.get("/text")
assert response.content_type == "application/json"
_, response = app.test_client.get("/json")
assert response.content_type == "text/plain; charset=utf-8"
def test_unknown_fallback_format(app):
with pytest.raises(SanicException, match="Unknown format: bad"):
app.config.FALLBACK_ERROR_FORMAT = "bad"
def test_route_error_format_unknown(app):
with pytest.raises(SanicException, match="Unknown format: bad"):
@app.get("/text", error_format="bad")
def handler(request):
...
def test_fallback_with_content_type_mismatch_accept(app):
app.config.FALLBACK_ERROR_FORMAT = "auto"
_, response = app.test_client.get(
"/error",
headers={"content-type": "application/json", "accept": "text/plain"},
)
assert response.status == 500
assert response.content_type == "text/plain; charset=utf-8"
_, response = app.test_client.get(
"/error",
headers={"content-type": "text/plain", "accept": "foo/bar"},
)
assert response.status == 500
assert response.content_type == "text/html; charset=utf-8"
app.router.reset()
@app.route("/alt1")
@app.route("/alt2", error_format="text")
@app.route("/alt3", error_format="html")
def handler(_):
raise Exception("problem here")
# Yes, we know this return value is unreachable. This is on purpose.
return json({})
app.router.finalize()
_, response = app.test_client.get(
"/alt1",
headers={"accept": "foo/bar"},
)
assert response.status == 500
assert response.content_type == "text/html; charset=utf-8"
_, response = app.test_client.get(
"/alt1",
headers={"accept": "foo/bar,*/*"},
)
assert response.status == 500
assert response.content_type == "application/json"
_, response = app.test_client.get(
"/alt2",
headers={"accept": "foo/bar"},
)
assert response.status == 500
assert response.content_type == "text/html; charset=utf-8"
_, response = app.test_client.get(
"/alt2",
headers={"accept": "foo/bar,*/*"},
)
assert response.status == 500
assert response.content_type == "text/plain; charset=utf-8"
_, response = app.test_client.get(
"/alt3",
headers={"accept": "foo/bar"},
)
assert response.status == 500
assert response.content_type == "text/html; charset=utf-8"
@pytest.mark.parametrize(
"accept,content_type,expected",
(
(None, None, "text/plain; charset=utf-8"),
("foo/bar", None, "text/html; charset=utf-8"),
("application/json", None, "application/json"),
("application/json,text/plain", None, "application/json"),
("text/plain,application/json", None, "application/json"),
("text/plain,foo/bar", None, "text/plain; charset=utf-8"),
# Following test is valid after v22.3
# ("text/plain,text/html", None, "text/plain; charset=utf-8"),
("*/*", "foo/bar", "text/html; charset=utf-8"),
("*/*", "application/json", "application/json"),
),
)
def test_combinations_for_auto(fake_request, accept, content_type, expected):
if accept:
fake_request.headers["accept"] = accept
else:
del fake_request.headers["accept"]
if content_type:
fake_request.headers["content-type"] = content_type
try:
raise Exception("bad stuff")
except Exception as e:
response = exception_response(
fake_request,
e,
True,
base=HTMLRenderer,
fallback="auto",
)
assert response.content_type == expected

View File

@@ -1,3 +1,4 @@
import logging
import warnings
import pytest
@@ -15,6 +16,7 @@ from sanic.exceptions import (
abort,
)
from sanic.response import text
from websockets.version import version as websockets_version
class SanicExceptionTestException(Exception):
@@ -232,3 +234,41 @@ def test_sanic_exception(exception_app):
request, response = exception_app.test_client.get("/old_abort")
assert response.status == 500
assert len(w) == 1 and "deprecated" in w[0].message.args[0]
def test_custom_exception_default_message(exception_app):
class TeaError(SanicException):
message = "Tempest in a teapot"
status_code = 418
exception_app.router.reset()
@exception_app.get("/tempest")
def tempest(_):
raise TeaError
_, response = exception_app.test_client.get("/tempest", debug=True)
assert response.status == 418
assert b"Tempest in a teapot" in response.body
def test_exception_in_ws_logged(caplog):
app = Sanic(__file__)
@app.websocket("/feed")
async def feed(request, ws):
raise Exception("...")
with caplog.at_level(logging.INFO):
app.test_client.websocket("/feed")
# Websockets v10.0 and above output an additional
# INFO message when a ws connection is accepted
ws_version_parts = websockets_version.split(".")
ws_major = int(ws_version_parts[0])
record_index = 2 if ws_major >= 10 else 1
assert caplog.record_tuples[record_index][0] == "sanic.error"
assert caplog.record_tuples[record_index][1] == logging.ERROR
assert (
"Exception occurred while handling uri:"
in caplog.record_tuples[record_index][2]
)

View File

@@ -1,5 +1,7 @@
import asyncio
import pytest
from bs4 import BeautifulSoup
from sanic import Sanic
@@ -8,9 +10,6 @@ from sanic.handlers import ErrorHandler
from sanic.response import stream, text
exception_handler_app = Sanic("test_exception_handler")
async def sample_streaming_fn(response):
await response.write("foo,")
await asyncio.sleep(0.001)
@@ -21,113 +20,107 @@ class ErrorWithRequestCtx(ServerError):
pass
@exception_handler_app.route("/1")
def handler_1(request):
raise InvalidUsage("OK")
@pytest.fixture
def exception_handler_app():
exception_handler_app = Sanic("test_exception_handler")
@exception_handler_app.route("/1", error_format="html")
def handler_1(request):
raise InvalidUsage("OK")
@exception_handler_app.route("/2", error_format="html")
def handler_2(request):
raise ServerError("OK")
@exception_handler_app.route("/3", error_format="html")
def handler_3(request):
raise NotFound("OK")
@exception_handler_app.route("/4", error_format="html")
def handler_4(request):
foo = bar # noqa -- F821
return text(foo)
@exception_handler_app.route("/5", error_format="html")
def handler_5(request):
class CustomServerError(ServerError):
pass
raise CustomServerError("Custom server error")
@exception_handler_app.route("/6/<arg:int>", error_format="html")
def handler_6(request, arg):
try:
foo = 1 / arg
except Exception as e:
raise e from ValueError(f"{arg}")
return text(foo)
@exception_handler_app.route("/7", error_format="html")
def handler_7(request):
raise Forbidden("go away!")
@exception_handler_app.route("/8", error_format="html")
def handler_8(request):
raise ErrorWithRequestCtx("OK")
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
def handler_exception_with_ctx(request, exception):
return text(request.ctx.middleware_ran)
@exception_handler_app.exception(ServerError)
def handler_exception(request, exception):
return text("OK")
@exception_handler_app.exception(Forbidden)
async def async_handler_exception(request, exception):
return stream(
sample_streaming_fn,
content_type="text/csv",
)
@exception_handler_app.middleware
async def some_request_middleware(request):
request.ctx.middleware_ran = "Done."
return exception_handler_app
@exception_handler_app.route("/2")
def handler_2(request):
raise ServerError("OK")
@exception_handler_app.route("/3")
def handler_3(request):
raise NotFound("OK")
@exception_handler_app.route("/4")
def handler_4(request):
foo = bar # noqa -- F821 undefined name 'bar' is done to throw exception
return text(foo)
@exception_handler_app.route("/5")
def handler_5(request):
class CustomServerError(ServerError):
pass
raise CustomServerError("Custom server error")
@exception_handler_app.route("/6/<arg:int>")
def handler_6(request, arg):
try:
foo = 1 / arg
except Exception as e:
raise e from ValueError(f"{arg}")
return text(foo)
@exception_handler_app.route("/7")
def handler_7(request):
raise Forbidden("go away!")
@exception_handler_app.route("/8")
def handler_8(request):
raise ErrorWithRequestCtx("OK")
@exception_handler_app.exception(ErrorWithRequestCtx, NotFound)
def handler_exception_with_ctx(request, exception):
return text(request.ctx.middleware_ran)
@exception_handler_app.exception(ServerError)
def handler_exception(request, exception):
return text("OK")
@exception_handler_app.exception(Forbidden)
async def async_handler_exception(request, exception):
return stream(
sample_streaming_fn,
content_type="text/csv",
)
@exception_handler_app.middleware
async def some_request_middleware(request):
request.ctx.middleware_ran = "Done."
def test_invalid_usage_exception_handler():
def test_invalid_usage_exception_handler(exception_handler_app):
request, response = exception_handler_app.test_client.get("/1")
assert response.status == 400
def test_server_error_exception_handler():
def test_server_error_exception_handler(exception_handler_app):
request, response = exception_handler_app.test_client.get("/2")
assert response.status == 200
assert response.text == "OK"
def test_not_found_exception_handler():
def test_not_found_exception_handler(exception_handler_app):
request, response = exception_handler_app.test_client.get("/3")
assert response.status == 200
def test_text_exception__handler():
def test_text_exception__handler(exception_handler_app):
request, response = exception_handler_app.test_client.get("/random")
assert response.status == 200
assert response.text == "Done."
def test_async_exception_handler():
def test_async_exception_handler(exception_handler_app):
request, response = exception_handler_app.test_client.get("/7")
assert response.status == 200
assert response.text == "foo,bar"
def test_html_traceback_output_in_debug_mode():
def test_html_traceback_output_in_debug_mode(exception_handler_app):
request, response = exception_handler_app.test_client.get("/4", debug=True)
assert response.status == 500
soup = BeautifulSoup(response.body, "html.parser")
html = str(soup)
assert "response = handler(request, **kwargs)" in html
assert "handler_4" in html
assert "foo = bar" in html
@@ -137,12 +130,12 @@ def test_html_traceback_output_in_debug_mode():
) == summary_text
def test_inherited_exception_handler():
def test_inherited_exception_handler(exception_handler_app):
request, response = exception_handler_app.test_client.get("/5")
assert response.status == 200
def test_chained_exception_handler():
def test_chained_exception_handler(exception_handler_app):
request, response = exception_handler_app.test_client.get(
"/6/0", debug=True
)
@@ -151,11 +144,9 @@ def test_chained_exception_handler():
soup = BeautifulSoup(response.body, "html.parser")
html = str(soup)
assert "response = handler(request, **kwargs)" in html
assert "handler_6" in html
assert "foo = 1 / arg" in html
assert "ValueError" in html
assert "The above exception was the direct cause" in html
summary_text = " ".join(soup.select(".summary")[0].text.split())
assert (
@@ -163,7 +154,7 @@ def test_chained_exception_handler():
) == summary_text
def test_exception_handler_lookup():
def test_exception_handler_lookup(exception_handler_app):
class CustomError(Exception):
pass
@@ -186,26 +177,32 @@ def test_exception_handler_lookup():
class ModuleNotFoundError(ImportError):
pass
handler = ErrorHandler()
handler = ErrorHandler("auto")
handler.add(ImportError, import_error_handler)
handler.add(CustomError, custom_error_handler)
handler.add(ServerError, server_error_handler)
assert handler.lookup(ImportError()) == import_error_handler
assert handler.lookup(ModuleNotFoundError()) == import_error_handler
assert handler.lookup(CustomError()) == custom_error_handler
assert handler.lookup(ServerError("Error")) == server_error_handler
assert handler.lookup(CustomServerError("Error")) == server_error_handler
assert handler.lookup(ImportError(), None) == import_error_handler
assert handler.lookup(ModuleNotFoundError(), None) == import_error_handler
assert handler.lookup(CustomError(), None) == custom_error_handler
assert handler.lookup(ServerError("Error"), None) == server_error_handler
assert (
handler.lookup(CustomServerError("Error"), None)
== server_error_handler
)
# once again to ensure there is no caching bug
assert handler.lookup(ImportError()) == import_error_handler
assert handler.lookup(ModuleNotFoundError()) == import_error_handler
assert handler.lookup(CustomError()) == custom_error_handler
assert handler.lookup(ServerError("Error")) == server_error_handler
assert handler.lookup(CustomServerError("Error")) == server_error_handler
assert handler.lookup(ImportError(), None) == import_error_handler
assert handler.lookup(ModuleNotFoundError(), None) == import_error_handler
assert handler.lookup(CustomError(), None) == custom_error_handler
assert handler.lookup(ServerError("Error"), None) == server_error_handler
assert (
handler.lookup(CustomServerError("Error"), None)
== server_error_handler
)
def test_exception_handler_processed_request_middleware():
def test_exception_handler_processed_request_middleware(exception_handler_app):
request, response = exception_handler_app.test_client.get("/8")
assert response.status == 200
assert response.text == "Done."

View File

@@ -0,0 +1,46 @@
import asyncio
import logging
import time
from collections import Counter
from multiprocessing import Process
import httpx
PORT = 42101
def test_no_exceptions_when_cancel_pending_request(app, caplog):
app.config.GRACEFUL_SHUTDOWN_TIMEOUT = 1
@app.get("/")
async def handler(request):
await asyncio.sleep(5)
@app.after_server_start
def shutdown(app, _):
time.sleep(0.2)
app.stop()
def ping():
time.sleep(0.1)
response = httpx.get("http://127.0.0.1:8000")
print(response.status_code)
p = Process(target=ping)
p.start()
with caplog.at_level(logging.INFO):
app.run()
p.kill()
counter = Counter([r[1] for r in caplog.record_tuples])
assert counter[logging.INFO] == 5
assert logging.ERROR not in counter
assert (
caplog.record_tuples[3][2]
== "Request: GET http://127.0.0.1:8000/ stopped. Transport is closed."
)

View File

@@ -0,0 +1,39 @@
from uuid import UUID
import pytest
from sanic import json
@pytest.mark.parametrize(
"idx,path,expectation",
(
(0, "/abc", "str"),
(1, "/123", "int"),
(2, "/123.5", "float"),
(3, "/8af729fe-2b94-4a95-a168-c07068568429", "UUID"),
),
)
def test_annotated_handlers(app, idx, path, expectation):
def build_response(num, foo):
return json({"num": num, "type": type(foo).__name__})
@app.get("/<foo>")
def handler0(_, foo: str):
return build_response(0, foo)
@app.get("/<foo>")
def handler1(_, foo: int):
return build_response(1, foo)
@app.get("/<foo>")
def handler2(_, foo: float):
return build_response(2, foo)
@app.get("/<foo>")
def handler3(_, foo: UUID):
return build_response(3, foo)
_, response = app.test_client.get(path)
assert response.json["num"] == idx
assert response.json["type"] == expectation

View File

@@ -3,8 +3,9 @@ from unittest.mock import Mock
import pytest
from sanic import headers, text
from sanic.exceptions import PayloadTooLarge
from sanic.exceptions import InvalidHeader, PayloadTooLarge
from sanic.http import Http
from sanic.request import Request
@pytest.fixture
@@ -182,3 +183,187 @@ def test_request_line(app):
)
assert request.request_line == b"GET / HTTP/1.1"
@pytest.mark.parametrize(
"raw",
(
"show/first, show/second",
"show/*, show/first",
"*/*, show/first",
"*/*, show/*",
"other/*; q=0.1, show/*; q=0.2",
"show/first; q=0.5, show/second; q=0.5",
"show/first; foo=bar, show/second; foo=bar",
"show/second, show/first; foo=bar",
"show/second; q=0.5, show/first; foo=bar; q=0.5",
"show/second; q=0.5, show/first; q=1.0",
"show/first, show/second; q=1.0",
),
)
def test_parse_accept_ordered_okay(raw):
ordered = headers.parse_accept(raw)
expected_subtype = (
"*" if all(q.subtype.is_wildcard for q in ordered) else "first"
)
assert ordered[0].type_ == "show"
assert ordered[0].subtype == expected_subtype
@pytest.mark.parametrize(
"raw",
(
"missing",
"missing/",
"/missing",
),
)
def test_bad_accept(raw):
with pytest.raises(InvalidHeader):
headers.parse_accept(raw)
def test_empty_accept():
assert headers.parse_accept("") == []
def test_wildcard_accept_set_ok():
accept = headers.parse_accept("*/*")[0]
assert accept.type_.is_wildcard
assert accept.subtype.is_wildcard
accept = headers.parse_accept("foo/bar")[0]
assert not accept.type_.is_wildcard
assert not accept.subtype.is_wildcard
def test_accept_parsed_against_str():
accept = headers.Accept.parse("foo/bar")
assert accept > "foo/bar; q=0.1"
def test_media_type_equality():
assert headers.MediaType("foo") == headers.MediaType("foo") == "foo"
assert headers.MediaType("foo") == headers.MediaType("*") == "*"
assert headers.MediaType("foo") != headers.MediaType("bar")
assert headers.MediaType("foo") != "bar"
def test_media_type_matching():
assert headers.MediaType("foo").match(headers.MediaType("foo"))
assert headers.MediaType("foo").match("foo")
assert not headers.MediaType("foo").match(headers.MediaType("*"))
assert not headers.MediaType("foo").match("*")
assert not headers.MediaType("foo").match(headers.MediaType("bar"))
assert not headers.MediaType("foo").match("bar")
@pytest.mark.parametrize(
"value,other,outcome,allow_type,allow_subtype",
(
# ALLOW BOTH
("foo/bar", "foo/bar", True, True, True),
("foo/bar", headers.Accept.parse("foo/bar"), True, True, True),
("foo/bar", "foo/*", True, True, True),
("foo/bar", headers.Accept.parse("foo/*"), True, True, True),
("foo/bar", "*/*", True, True, True),
("foo/bar", headers.Accept.parse("*/*"), True, True, True),
("foo/*", "foo/bar", True, True, True),
("foo/*", headers.Accept.parse("foo/bar"), True, True, True),
("foo/*", "foo/*", True, True, True),
("foo/*", headers.Accept.parse("foo/*"), True, True, True),
("foo/*", "*/*", True, True, True),
("foo/*", headers.Accept.parse("*/*"), True, True, True),
("*/*", "foo/bar", True, True, True),
("*/*", headers.Accept.parse("foo/bar"), True, True, True),
("*/*", "foo/*", True, True, True),
("*/*", headers.Accept.parse("foo/*"), True, True, True),
("*/*", "*/*", True, True, True),
("*/*", headers.Accept.parse("*/*"), True, True, True),
# ALLOW TYPE
("foo/bar", "foo/bar", True, True, False),
("foo/bar", headers.Accept.parse("foo/bar"), True, True, False),
("foo/bar", "foo/*", False, True, False),
("foo/bar", headers.Accept.parse("foo/*"), False, True, False),
("foo/bar", "*/*", False, True, False),
("foo/bar", headers.Accept.parse("*/*"), False, True, False),
("foo/*", "foo/bar", False, True, False),
("foo/*", headers.Accept.parse("foo/bar"), False, True, False),
("foo/*", "foo/*", False, True, False),
("foo/*", headers.Accept.parse("foo/*"), False, True, False),
("foo/*", "*/*", False, True, False),
("foo/*", headers.Accept.parse("*/*"), False, True, False),
("*/*", "foo/bar", False, True, False),
("*/*", headers.Accept.parse("foo/bar"), False, True, False),
("*/*", "foo/*", False, True, False),
("*/*", headers.Accept.parse("foo/*"), False, True, False),
("*/*", "*/*", False, True, False),
("*/*", headers.Accept.parse("*/*"), False, True, False),
# ALLOW SUBTYPE
("foo/bar", "foo/bar", True, False, True),
("foo/bar", headers.Accept.parse("foo/bar"), True, False, True),
("foo/bar", "foo/*", True, False, True),
("foo/bar", headers.Accept.parse("foo/*"), True, False, True),
("foo/bar", "*/*", False, False, True),
("foo/bar", headers.Accept.parse("*/*"), False, False, True),
("foo/*", "foo/bar", True, False, True),
("foo/*", headers.Accept.parse("foo/bar"), True, False, True),
("foo/*", "foo/*", True, False, True),
("foo/*", headers.Accept.parse("foo/*"), True, False, True),
("foo/*", "*/*", False, False, True),
("foo/*", headers.Accept.parse("*/*"), False, False, True),
("*/*", "foo/bar", False, False, True),
("*/*", headers.Accept.parse("foo/bar"), False, False, True),
("*/*", "foo/*", False, False, True),
("*/*", headers.Accept.parse("foo/*"), False, False, True),
("*/*", "*/*", False, False, True),
("*/*", headers.Accept.parse("*/*"), False, False, True),
),
)
def test_accept_matching(value, other, outcome, allow_type, allow_subtype):
assert (
headers.Accept.parse(value).match(
other,
allow_type_wildcard=allow_type,
allow_subtype_wildcard=allow_subtype,
)
is outcome
)
@pytest.mark.parametrize("value", ("foo/bar", "foo/*", "*/*"))
def test_value_in_accept(value):
acceptable = headers.parse_accept(value)
assert "foo/bar" in acceptable
assert "foo/*" in acceptable
assert "*/*" in acceptable
@pytest.mark.parametrize("value", ("foo/bar", "foo/*"))
def test_value_not_in_accept(value):
acceptable = headers.parse_accept(value)
assert "no/match" not in acceptable
assert "no/*" not in acceptable
@pytest.mark.parametrize(
"header,expected",
(
(
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", # noqa: E501
[
"text/html",
"application/xhtml+xml",
"image/avif",
"image/webp",
"application/xml;q=0.9",
"*/*;q=0.8",
],
),
),
)
def test_browser_headers(header, expected):
request = Request(b"/", {"accept": header}, "1.1", "GET", None, None)
assert request.accept == expected

137
tests/test_http.py Normal file
View File

@@ -0,0 +1,137 @@
import asyncio
import json as stdjson
from collections import namedtuple
from textwrap import dedent
from typing import AnyStr
import pytest
from sanic_testing.reusable import ReusableClient
from sanic import json, text
from sanic.app import Sanic
PORT = 1234
class RawClient:
CRLF = b"\r\n"
def __init__(self, host: str, port: int):
self.reader = None
self.writer = None
self.host = host
self.port = port
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
async def close(self):
self.writer.close()
await self.writer.wait_closed()
async def send(self, message: AnyStr):
if isinstance(message, str):
msg = self._clean(message).encode("utf-8")
else:
msg = message
await self._send(msg)
async def _send(self, message: bytes):
if not self.writer:
raise Exception("No open write stream")
self.writer.write(message)
async def recv(self, nbytes: int = -1) -> bytes:
if not self.reader:
raise Exception("No open read stream")
return await self.reader.read(nbytes)
def _clean(self, message: str) -> str:
return (
dedent(message)
.lstrip("\n")
.replace("\n", self.CRLF.decode("utf-8"))
)
@pytest.fixture
def test_app(app: Sanic):
app.config.KEEP_ALIVE_TIMEOUT = 1
@app.get("/")
async def base_handler(request):
return text("111122223333444455556666777788889999")
@app.post("/upload", stream=True)
async def upload_handler(request):
data = [part.decode("utf-8") async for part in request.stream]
return json(data)
return app
@pytest.fixture
def runner(test_app):
client = ReusableClient(test_app, port=PORT)
client.run()
yield client
client.stop()
@pytest.fixture
def client(runner):
client = namedtuple("Client", ("raw", "send", "recv"))
raw = RawClient(runner.host, runner.port)
runner._run(raw.connect())
def send(msg):
nonlocal runner
nonlocal raw
runner._run(raw.send(msg))
def recv(**kwargs):
nonlocal runner
nonlocal raw
method = raw.recv_until if "until" in kwargs else raw.recv
return runner._run(method(**kwargs))
yield client(raw, send, recv)
runner._run(raw.close())
def test_full_message(client):
client.send(
"""
GET / HTTP/1.1
host: localhost:7777
"""
)
response = client.recv()
assert len(response) == 140
assert b"200 OK" in response
def test_transfer_chunked(client):
client.send(
"""
POST /upload HTTP/1.1
transfer-encoding: chunked
"""
)
client.send(b"3\r\nfoo\r\n")
client.send(b"3\r\nbar\r\n")
client.send(b"0\r\n\r\n")
response = client.recv()
_, body = response.rsplit(b"\r\n\r\n", 1)
data = stdjson.loads(body)
assert data == ["foo", "bar"]

View File

@@ -2,16 +2,13 @@ import asyncio
import platform
from asyncio import sleep as aio_sleep
from json import JSONDecodeError
from os import environ
import httpcore
import httpx
import pytest
from sanic_testing.testing import HOST, SanicTestClient
from sanic_testing.reusable import ReusableClient
from sanic import Sanic, server
from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.response import text
@@ -21,164 +18,6 @@ CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
PORT = 42101 # test_keep_alive_timeout_reuse doesn't work with random port
class ReusableSanicConnectionPool(httpcore.AsyncConnectionPool):
last_reused_connection = None
async def _get_connection_from_pool(self, *args, **kwargs):
conn = await super()._get_connection_from_pool(*args, **kwargs)
self.__class__.last_reused_connection = conn
return conn
class ResusableSanicSession(httpx.AsyncClient):
def __init__(self, *args, **kwargs) -> None:
transport = ReusableSanicConnectionPool()
super().__init__(transport=transport, *args, **kwargs)
class ReuseableSanicTestClient(SanicTestClient):
def __init__(self, app, loop=None):
super().__init__(app)
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._server = None
self._tcp_connector = None
self._session = None
def get_new_session(self):
return ResusableSanicSession()
# Copied from SanicTestClient, but with some changes to reuse the
# same loop for the same app.
def _sanic_endpoint_test(
self,
method="get",
uri="/",
gather_request=True,
debug=False,
server_kwargs=None,
*request_args,
**request_kwargs,
):
loop = self._loop
results = [None, None]
exceptions = []
server_kwargs = server_kwargs or {"return_asyncio_server": True}
if gather_request:
def _collect_request(request):
if results[0] is None:
results[0] = request
self.app.request_middleware.appendleft(_collect_request)
if uri.startswith(
("http:", "https:", "ftp:", "ftps://", "//", "ws:", "wss:")
):
url = uri
else:
uri = uri if uri.startswith("/") else f"/{uri}"
scheme = "http"
url = f"{scheme}://{HOST}:{PORT}{uri}"
@self.app.listener("after_server_start")
async def _collect_response(loop):
try:
response = await self._local_request(
method, url, *request_args, **request_kwargs
)
results[-1] = response
except Exception as e2:
exceptions.append(e2)
if self._server is not None:
_server = self._server
else:
_server_co = self.app.create_server(
host=HOST, debug=debug, port=PORT, **server_kwargs
)
server.trigger_events(
self.app.listeners["before_server_start"], loop
)
try:
loop._stopping = False
_server = loop.run_until_complete(_server_co)
except Exception as e1:
raise e1
self._server = _server
server.trigger_events(self.app.listeners["after_server_start"], loop)
self.app.listeners["after_server_start"].pop()
if exceptions:
raise ValueError(f"Exception during request: {exceptions}")
if gather_request:
self.app.request_middleware.pop()
try:
request, response = results
return request, response
except Exception:
raise ValueError(
f"Request and response object expected, got ({results})"
)
else:
try:
return results[-1]
except Exception:
raise ValueError(f"Request object expected, got ({results})")
def kill_server(self):
try:
if self._server:
self._server.close()
self._loop.run_until_complete(self._server.wait_closed())
self._server = None
if self._session:
self._loop.run_until_complete(self._session.aclose())
self._session = None
except Exception as e3:
raise e3
# Copied from SanicTestClient, but with some changes to reuse the
# same TCPConnection and the sane ClientSession more than once.
# Note, you cannot use the same session if you are in a _different_
# loop, so the changes above are required too.
async def _local_request(self, method, url, *args, **kwargs):
raw_cookies = kwargs.pop("raw_cookies", None)
request_keepalive = kwargs.pop(
"request_keepalive", CONFIG_FOR_TESTS["KEEP_ALIVE_TIMEOUT"]
)
if not self._session:
self._session = self.get_new_session()
try:
response = await getattr(self._session, method.lower())(
url, timeout=request_keepalive, *args, **kwargs
)
except NameError:
raise Exception(response.status_code)
try:
response.json = response.json()
except (JSONDecodeError, UnicodeDecodeError):
response.json = None
response.body = await response.aread()
response.status = response.status_code
response.content_type = response.headers.get("content-type")
if raw_cookies:
response.raw_cookies = {}
for cookie in response.cookies:
response.raw_cookies[cookie.name] = cookie
return response
keep_alive_timeout_app_reuse = Sanic("test_ka_timeout_reuse")
keep_alive_app_client_timeout = Sanic("test_ka_client_timeout")
keep_alive_app_server_timeout = Sanic("test_ka_server_timeout")
@@ -224,21 +63,22 @@ def test_keep_alive_timeout_reuse():
"""If the server keep-alive timeout and client keep-alive timeout are
both longer than the delay, the client _and_ server will successfully
reuse the existing connection."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(keep_alive_timeout_app_reuse, loop=loop, port=PORT)
with client:
headers = {"Connection": "keep-alive"}
request, response = client.get("/1", headers=headers)
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 1
loop.run_until_complete(aio_sleep(1))
request, response = client.get("/1")
assert response.status == 200
assert response.text == "OK"
assert ReusableSanicConnectionPool.last_reused_connection
finally:
client.kill_server()
assert request.protocol.state["requests_count"] == 2
@pytest.mark.skipif(
@@ -250,22 +90,22 @@ def test_keep_alive_timeout_reuse():
def test_keep_alive_client_timeout():
"""If the server keep-alive timeout is longer than the client
keep-alive timeout, client will try to create a new connection here."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_app_client_timeout, loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(
keep_alive_app_client_timeout, loop=loop, port=PORT
)
with client:
headers = {"Connection": "keep-alive"}
_, response = client.get("/1", headers=headers, request_keepalive=1)
request, response = client.get("/1", headers=headers, timeout=1)
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 1
loop.run_until_complete(aio_sleep(2))
_, response = client.get("/1", request_keepalive=1)
assert ReusableSanicConnectionPool.last_reused_connection is None
finally:
client.kill_server()
request, response = client.get("/1", timeout=1)
assert request.protocol.state["requests_count"] == 1
@pytest.mark.skipif(
@@ -277,22 +117,23 @@ def test_keep_alive_server_timeout():
keep-alive timeout, the client will either a 'Connection reset' error
_or_ a new connection. Depending on how the event-loop handles the
broken server connection."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_app_server_timeout, loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(
keep_alive_app_server_timeout, loop=loop, port=PORT
)
with client:
headers = {"Connection": "keep-alive"}
_, response = client.get("/1", headers=headers, request_keepalive=60)
request, response = client.get("/1", headers=headers, timeout=60)
assert response.status == 200
assert response.text == "OK"
assert request.protocol.state["requests_count"] == 1
loop.run_until_complete(aio_sleep(3))
_, response = client.get("/1", request_keepalive=60)
request, response = client.get("/1", timeout=60)
assert ReusableSanicConnectionPool.last_reused_connection is None
finally:
client.kill_server()
assert request.protocol.state["requests_count"] == 1
@pytest.mark.skipif(
@@ -300,10 +141,10 @@ def test_keep_alive_server_timeout():
reason="Not testable with current client",
)
def test_keep_alive_connection_context():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_app_context, loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReusableClient(keep_alive_app_context, loop=loop, port=PORT)
with client:
headers = {"Connection": "keep-alive"}
request1, _ = client.post("/ctx", headers=headers)
@@ -315,5 +156,4 @@ def test_keep_alive_connection_context():
assert (
request1.conn_info.ctx.foo == request2.conn_info.ctx.foo == "hello"
)
finally:
client.kill_server()
assert request2.protocol.state["requests_count"] == 2

View File

@@ -5,6 +5,7 @@ import uuid
from importlib import reload
from io import StringIO
from unittest.mock import Mock
import pytest
@@ -51,7 +52,7 @@ def test_log(app):
def test_logging_defaults():
# reset_logging()
app = Sanic("test_logging")
Sanic("test_logging")
for fmt in [h.formatter for h in logging.getLogger("sanic.root").handlers]:
assert (
@@ -87,7 +88,7 @@ def test_logging_pass_customer_logconfig():
"format"
] = "%(asctime)s - (%(name)s)[%(levelname)s]: %(message)s"
app = Sanic("test_logging", log_config=modified_config)
Sanic("test_logging", log_config=modified_config)
for fmt in [h.formatter for h in logging.getLogger("sanic.root").handlers]:
assert fmt._fmt == modified_config["formatters"]["generic"]["format"]
@@ -111,11 +112,13 @@ def test_logging_pass_customer_logconfig():
),
)
def test_log_connection_lost(app, debug, monkeypatch):
""" Should not log Connection lost exception on non debug """
"""Should not log Connection lost exception on non debug"""
stream = StringIO()
error = logging.getLogger("sanic.error")
error.addHandler(logging.StreamHandler(stream))
monkeypatch.setattr(sanic.server, "error_logger", error)
monkeypatch.setattr(
sanic.server.protocols.http_protocol, "error_logger", error
)
@app.route("/conn_lost")
async def conn_lost(request):
@@ -208,6 +211,56 @@ def test_logging_modified_root_logger_config():
modified_config = LOGGING_CONFIG_DEFAULTS
modified_config["loggers"]["sanic.root"]["level"] = "DEBUG"
app = Sanic("test_logging", log_config=modified_config)
Sanic("test_logging", log_config=modified_config)
assert logging.getLogger("sanic.root").getEffectiveLevel() == logging.DEBUG
def test_access_log_client_ip_remote_addr(monkeypatch):
access = Mock()
monkeypatch.setattr(sanic.http, "access_logger", access)
app = Sanic("test_logging")
app.config.PROXIES_COUNT = 2
@app.route("/")
async def handler(request):
return text(request.remote_addr)
headers = {"X-Forwarded-For": "1.1.1.1, 2.2.2.2"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "1.1.1.1"
access.info.assert_called_with(
"",
extra={
"status": 200,
"byte": len(response.content),
"host": f"{request.remote_addr}:{request.port}",
"request": f"GET {request.scheme}://{request.host}/",
},
)
def test_access_log_client_ip_reqip(monkeypatch):
access = Mock()
monkeypatch.setattr(sanic.http, "access_logger", access)
app = Sanic("test_logging")
@app.route("/")
async def handler(request):
return text(request.ip)
request, response = app.test_client.get("/")
access.info.assert_called_with(
"",
extra={
"status": 200,
"byte": len(response.content),
"host": f"{request.ip}:{request.port}",
"request": f"GET {request.scheme}://{request.host}/",
},
)

View File

@@ -6,85 +6,37 @@ from sanic_testing.testing import PORT
from sanic.config import BASE_LOGO
def test_logo_base(app, caplog):
server = app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
def test_logo_base(app, run_startup):
logs = run_startup(app)
with caplog.at_level(logging.DEBUG):
_server = loop.run_until_complete(server)
_server.close()
loop.run_until_complete(_server.wait_closed())
app.stop()
assert caplog.record_tuples[0][1] == logging.DEBUG
assert caplog.record_tuples[0][2] == BASE_LOGO
assert logs[0][1] == logging.DEBUG
assert logs[0][2] == BASE_LOGO
def test_logo_false(app, caplog):
def test_logo_false(app, caplog, run_startup):
app.config.LOGO = False
server = app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
logs = run_startup(app)
with caplog.at_level(logging.DEBUG):
_server = loop.run_until_complete(server)
_server.close()
loop.run_until_complete(_server.wait_closed())
app.stop()
banner, port = caplog.record_tuples[0][2].rsplit(":", 1)
assert caplog.record_tuples[0][1] == logging.INFO
banner, port = logs[0][2].rsplit(":", 1)
assert logs[0][1] == logging.INFO
assert banner == "Goin' Fast @ http://127.0.0.1"
assert int(port) > 0
def test_logo_true(app, caplog):
def test_logo_true(app, run_startup):
app.config.LOGO = True
server = app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
logs = run_startup(app)
with caplog.at_level(logging.DEBUG):
_server = loop.run_until_complete(server)
_server.close()
loop.run_until_complete(_server.wait_closed())
app.stop()
assert caplog.record_tuples[0][1] == logging.DEBUG
assert caplog.record_tuples[0][2] == BASE_LOGO
assert logs[0][1] == logging.DEBUG
assert logs[0][2] == BASE_LOGO
def test_logo_custom(app, caplog):
def test_logo_custom(app, run_startup):
app.config.LOGO = "My Custom Logo"
server = app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
logs = run_startup(app)
with caplog.at_level(logging.DEBUG):
_server = loop.run_until_complete(server)
_server.close()
loop.run_until_complete(_server.wait_closed())
app.stop()
assert caplog.record_tuples[0][1] == logging.DEBUG
assert caplog.record_tuples[0][2] == "My Custom Logo"
assert logs[0][1] == logging.DEBUG
assert logs[0][2] == "My Custom Logo"

View File

@@ -5,7 +5,7 @@ from itertools import count
from sanic.exceptions import NotFound
from sanic.request import Request
from sanic.response import HTTPResponse, text
from sanic.response import HTTPResponse, json, text
# ------------------------------------------------------------ #
@@ -37,14 +37,19 @@ def test_middleware_request_as_convenience(app):
async def handler1(request):
results.append(request)
@app.route("/")
@app.on_request()
async def handler2(request):
results.append(request)
@app.route("/")
async def handler3(request):
return text("OK")
request, response = app.test_client.get("/")
assert response.text == "OK"
assert type(results[0]) is Request
assert type(results[1]) is Request
def test_middleware_response(app):
@@ -79,7 +84,12 @@ def test_middleware_response_as_convenience(app):
results.append(request)
@app.on_response
async def process_response(request, response):
async def process_response_1(request, response):
results.append(request)
results.append(response)
@app.on_response()
async def process_response_2(request, response):
results.append(request)
results.append(response)
@@ -93,6 +103,8 @@ def test_middleware_response_as_convenience(app):
assert type(results[0]) is Request
assert type(results[1]) is Request
assert isinstance(results[2], HTTPResponse)
assert type(results[3]) is Request
assert isinstance(results[4], HTTPResponse)
def test_middleware_response_as_convenience_called(app):
@@ -271,3 +283,17 @@ def test_request_middleware_executes_once(app):
request, response = app.test_client.get("/")
assert next(i) == 3
def test_middleware_added_response(app):
@app.on_response
def display(_, response):
response["foo"] = "bar"
return json(response)
@app.get("/")
async def handler(request):
return {}
_, response = app.test_client.get("/")
assert response.json["foo"] == "bar"

View File

@@ -140,3 +140,39 @@ def test_ipv6_address_is_not_wrapped(app):
assert resp.json["client"] == "[::1]"
assert resp.json["client_ip"] == "::1"
assert request.ip == "::1"
def test_request_accept():
app = Sanic("req-generator")
@app.get("/")
async def get(request):
return response.empty()
request, _ = app.test_client.get(
"/",
headers={
"Accept": "text/*, text/plain, text/plain;format=flowed, */*"
},
)
assert request.accept == [
"text/plain;format=flowed",
"text/plain",
"text/*",
"*/*",
]
request, _ = app.test_client.get(
"/",
headers={
"Accept": (
"text/plain; q=0.5, text/html, text/x-dvi; q=0.8, text/x-c"
)
},
)
assert request.accept == [
"text/html",
"text/x-c",
"text/x-dvi; q=0.8",
"text/plain; q=0.5",
]

View File

@@ -2,6 +2,7 @@ import asyncio
import httpcore
import httpx
import pytest
from sanic_testing.testing import SanicTestClient
@@ -48,42 +49,51 @@ class DelayableSanicTestClient(SanicTestClient):
return DelayableSanicSession(request_delay=self._request_delay)
request_timeout_default_app = Sanic("test_request_timeout_default")
request_no_timeout_app = Sanic("test_request_no_timeout")
request_timeout_default_app.config.REQUEST_TIMEOUT = 0.6
request_no_timeout_app.config.REQUEST_TIMEOUT = 0.6
@pytest.fixture
def request_no_timeout_app():
app = Sanic("test_request_no_timeout")
app.config.REQUEST_TIMEOUT = 0.6
@app.route("/1")
async def handler2(request):
return text("OK")
return app
@request_timeout_default_app.route("/1")
async def handler1(request):
return text("OK")
@pytest.fixture
def request_timeout_default_app():
app = Sanic("test_request_timeout_default")
app.config.REQUEST_TIMEOUT = 0.6
@app.route("/1")
async def handler1(request):
return text("OK")
@app.websocket("/ws1")
async def ws_handler1(request, ws):
await ws.send("OK")
return app
@request_no_timeout_app.route("/1")
async def handler2(request):
return text("OK")
@request_timeout_default_app.websocket("/ws1")
async def ws_handler1(request, ws):
await ws.send("OK")
def test_default_server_error_request_timeout():
def test_default_server_error_request_timeout(request_timeout_default_app):
client = DelayableSanicTestClient(request_timeout_default_app, 2)
request, response = client.get("/1")
_, response = client.get("/1")
assert response.status == 408
assert "Request Timeout" in response.text
def test_default_server_error_request_dont_timeout():
def test_default_server_error_request_dont_timeout(request_no_timeout_app):
client = DelayableSanicTestClient(request_no_timeout_app, 0.2)
request, response = client.get("/1")
_, response = client.get("/1")
assert response.status == 200
assert response.text == "OK"
def test_default_server_error_websocket_request_timeout():
def test_default_server_error_websocket_request_timeout(
request_timeout_default_app,
):
headers = {
"Upgrade": "websocket",
@@ -93,7 +103,7 @@ def test_default_server_error_websocket_request_timeout():
}
client = DelayableSanicTestClient(request_timeout_default_app, 2)
request, response = client.get("/ws1", headers=headers)
_, response = client.get("/ws1", headers=headers)
assert response.status == 408
assert "Request Timeout" in response.text

View File

@@ -654,41 +654,46 @@ def test_websocket_route_invalid_handler(app):
@pytest.mark.asyncio
@pytest.mark.parametrize("url", ["/ws", "ws"])
async def test_websocket_route_asgi(app, url):
ev = asyncio.Event()
@app.after_server_start
async def setup_ev(app, _):
app.ctx.ev = asyncio.Event()
@app.websocket(url)
async def handler(request, ws):
ev.set()
request.app.ctx.ev.set()
request, response = await app.asgi_client.websocket(url)
assert ev.is_set()
@app.get("/ev")
async def check(request):
return json({"set": request.app.ctx.ev.is_set()})
_, response = await app.asgi_client.websocket(url)
_, response = await app.asgi_client.get("/")
assert response.json["set"]
def test_websocket_route_with_subprotocols(app):
@pytest.mark.parametrize(
"subprotocols,expected",
(
(["one"], "one"),
(["three", "one"], "one"),
(["tree"], None),
(None, None),
),
)
def test_websocket_route_with_subprotocols(app, subprotocols, expected):
results = []
@app.websocket("/ws", subprotocols=["foo", "bar"])
@app.websocket("/ws", subprotocols=["zero", "one", "two", "three"])
async def handler(request, ws):
results.append(ws.subprotocol)
nonlocal results
results = ws.subprotocol
assert ws.subprotocol is not None
_, response = SanicTestClient(app).websocket("/ws", subprotocols=["bar"])
assert response.opened is True
assert results == ["bar"]
_, response = SanicTestClient(app).websocket(
"/ws", subprotocols=["bar", "foo"]
"/ws", subprotocols=subprotocols
)
assert response.opened is True
assert results == ["bar", "bar"]
_, response = SanicTestClient(app).websocket("/ws", subprotocols=["baz"])
assert response.opened is True
assert results == ["bar", "bar", None]
_, response = SanicTestClient(app).websocket("/ws")
assert response.opened is True
assert results == ["bar", "bar", None, None]
assert results == expected
@pytest.mark.parametrize("strict_slashes", [True, False, None])

View File

@@ -8,7 +8,7 @@ import pytest
from sanic_testing.testing import HOST, PORT
from sanic.exceptions import InvalidUsage
from sanic.exceptions import InvalidUsage, SanicException
AVAILABLE_LISTENERS = [
@@ -103,7 +103,11 @@ async def test_trigger_before_events_create_server(app):
async def init_db(app, loop):
app.db = MySanicDb()
await app.create_server(debug=True, return_asyncio_server=True, port=PORT)
srv = await app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
await srv.startup()
await srv.before_start()
assert hasattr(app, "db")
assert isinstance(app.db, MySanicDb)
@@ -157,14 +161,15 @@ def test_create_server_trigger_events(app):
serv_coro = app.create_server(return_asyncio_server=True, sock=sock)
serv_task = asyncio.ensure_future(serv_coro, loop=loop)
server = loop.run_until_complete(serv_task)
server.after_start()
loop.run_until_complete(server.startup())
loop.run_until_complete(server.after_start())
try:
loop.run_forever()
except KeyboardInterrupt as e:
except KeyboardInterrupt:
loop.stop()
finally:
# Run the on_stop function if provided
server.before_stop()
loop.run_until_complete(server.before_stop())
# Wait for server to close
close_task = server.close()
@@ -174,5 +179,19 @@ def test_create_server_trigger_events(app):
signal.stopped = True
for connection in server.connections:
connection.close_if_idle()
server.after_stop()
loop.run_until_complete(server.after_stop())
assert flag1 and flag2 and flag3
@pytest.mark.asyncio
async def test_missing_startup_raises_exception(app):
@app.listener("before_server_start")
async def init_db(app, loop):
...
srv = await app.create_server(
debug=True, return_asyncio_server=True, port=PORT
)
with pytest.raises(SanicException):
await srv.before_start()

View File

@@ -95,7 +95,7 @@ def test_windows_workaround():
os.kill(os.getpid(), signal.SIGINT)
await asyncio.sleep(0.2)
assert app.is_stopping
assert app.stay_active_task.result() == None
assert app.stay_active_task.result() is None
# Second Ctrl+C should raise
with pytest.raises(KeyboardInterrupt):
os.kill(os.getpid(), signal.SIGINT)

View File

@@ -68,6 +68,7 @@ async def test_dispatch_signal_triggers_multiple_handlers(app):
app.signal_router.finalize()
assert len(app.signal_router.routes) == 3
await app.dispatch("foo.bar.baz")
assert counter == 2
@@ -331,7 +332,8 @@ def test_event_on_bp_not_registered():
"event,expected",
(
("foo.bar.baz", True),
("server.init.before", False),
("server.init.before", True),
("server.init.somethingelse", False),
("http.request.start", False),
("sanic.notice.anything", True),
),

View File

@@ -461,6 +461,22 @@ def test_nested_dir(app, static_file_directory):
assert response.text == "foo\n"
def test_handle_is_a_directory_error(app, static_file_directory):
error_text = "Is a directory. Access denied"
app.static("/static", static_file_directory)
@app.exception(Exception)
async def handleStaticDirError(request, exception):
if isinstance(exception, IsADirectoryError):
return text(error_text, status=403)
raise exception
request, response = app.test_client.get("/static/")
assert response.status == 403
assert response.text == error_text
def test_stack_trace_on_not_found(app, static_file_directory, caplog):
app.static("/static", static_file_directory)
@@ -507,3 +523,56 @@ def test_multiple_statics(app, static_file_directory):
assert response.body == get_file_content(
static_file_directory, "python.png"
)
def test_resource_type_default(app, static_file_directory):
app.static("/static", static_file_directory)
app.static("/file", get_file_path(static_file_directory, "test.file"))
_, response = app.test_client.get("/static")
assert response.status == 404
_, response = app.test_client.get("/file")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.file"
)
def test_resource_type_file(app, static_file_directory):
app.static(
"/file",
get_file_path(static_file_directory, "test.file"),
resource_type="file",
)
_, response = app.test_client.get("/file")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.file"
)
with pytest.raises(TypeError):
app.static("/static", static_file_directory, resource_type="file")
def test_resource_type_dir(app, static_file_directory):
app.static("/static", static_file_directory, resource_type="dir")
_, response = app.test_client.get("/static/test.file")
assert response.status == 200
assert response.body == get_file_content(
static_file_directory, "test.file"
)
with pytest.raises(TypeError):
app.static(
"/file",
get_file_path(static_file_directory, "test.file"),
resource_type="dir",
)
def test_resource_type_unknown(app, static_file_directory, caplog):
with pytest.raises(ValueError):
app.static("/static", static_file_directory, resource_type="unknown")

21
tests/test_touchup.py Normal file
View File

@@ -0,0 +1,21 @@
import logging
from sanic.signals import RESERVED_NAMESPACES
from sanic.touchup import TouchUp
def test_touchup_methods(app):
assert len(TouchUp._registry) == 9
async def test_ode_removes_dispatch_events(app, caplog):
with caplog.at_level(logging.DEBUG, logger="sanic.root"):
await app._startup()
logs = caplog.record_tuples
for signal in RESERVED_NAMESPACES["http"]:
assert (
"sanic.root",
logging.DEBUG,
f"Disabling event: {signal}",
) in logs

View File

@@ -43,7 +43,15 @@ def test_routes_with_multiple_hosts(app):
)
def test_websocket_bp_route_name(app):
@pytest.mark.parametrize(
"name,expected",
(
("test_route", "/bp/route"),
("test_route2", "/bp/route2"),
("foobar_3", "/bp/route3"),
),
)
def test_websocket_bp_route_name(app, name, expected):
"""Tests that blueprint websocket route is named."""
event = asyncio.Event()
bp = Blueprint("test_bp", url_prefix="/bp")
@@ -69,22 +77,12 @@ def test_websocket_bp_route_name(app):
uri = app.url_for("test_bp.main")
assert uri == "/bp/main"
uri = app.url_for("test_bp.test_route")
assert uri == "/bp/route"
uri = app.url_for(f"test_bp.{name}")
assert uri == expected
request, response = SanicTestClient(app).websocket(uri)
assert response.opened is True
assert event.is_set()
event.clear()
uri = app.url_for("test_bp.test_route2")
assert uri == "/bp/route2"
request, response = SanicTestClient(app).websocket(uri)
assert response.opened is True
assert event.is_set()
uri = app.url_for("test_bp.foobar_3")
assert uri == "/bp/route3"
# TODO: add test with a route with multiple hosts
# TODO: add test with a route with _host in url_for

View File

@@ -175,7 +175,7 @@ def test_worker_close(worker):
worker.wsgi = mock.Mock()
conn = mock.Mock()
conn.websocket = mock.Mock()
conn.websocket.close_connection = mock.Mock(wraps=_a_noop)
conn.websocket.fail_connection = mock.Mock(wraps=_a_noop)
worker.connections = set([conn])
worker.log = mock.Mock()
worker.loop = loop
@@ -190,5 +190,5 @@ def test_worker_close(worker):
loop.run_until_complete(_close)
assert worker.signal.stopped
assert conn.websocket.close_connection.called
assert conn.websocket.fail_connection.called
assert len(worker.servers) == 0