Merge branch 'master' of github.com:huge-success/sanic into config_from_object_string

This commit is contained in:
Jotagê Sales
2019-03-04 00:37:59 -03:00
47 changed files with 1521 additions and 213 deletions

View File

@@ -0,0 +1,53 @@
from random import choice, seed
from pytest import mark
import sanic.router
seed("Pack my box with five dozen liquor jugs.")
# Disable Caching for testing purpose
sanic.router.ROUTER_CACHE_SIZE = 0
class TestSanicRouteResolution:
@mark.asyncio
async def test_resolve_route_no_arg_string_path(
self, sanic_router, route_generator, benchmark
):
simple_routes = route_generator.generate_random_direct_route(
max_route_depth=4
)
router, simple_routes = sanic_router(route_details=simple_routes)
route_to_call = choice(simple_routes)
result = benchmark.pedantic(
router._get,
("/{}".format(route_to_call[-1]), route_to_call[0], "localhost"),
iterations=1000,
rounds=1000,
)
assert await result[0](None) == 1
@mark.asyncio
async def test_resolve_route_with_typed_args(
self, sanic_router, route_generator, benchmark
):
typed_routes = route_generator.add_typed_parameters(
route_generator.generate_random_direct_route(max_route_depth=4),
max_route_depth=8,
)
router, typed_routes = sanic_router(route_details=typed_routes)
route_to_call = choice(typed_routes)
url = route_generator.generate_url_for_template(
template=route_to_call[-1]
)
print("{} -> {}".format(route_to_call[-1], url))
result = benchmark.pedantic(
router._get,
("/{}".format(url), route_to_call[0], "localhost"),
iterations=1000,
rounds=1000,
)
assert await result[0](None) == 1

View File

@@ -1,12 +1,130 @@
import random
import re
import string
import sys
import uuid
import pytest
from sanic import Sanic
from sanic.router import RouteExists, Router
random.seed("Pack my box with five dozen liquor jugs.")
if sys.platform in ["win32", "cygwin"]:
collect_ignore = ["test_worker.py"]
async def _handler(request):
"""
Dummy placeholder method used for route resolver when creating a new
route into the sanic router. This router is not actually called by the
sanic app. So do not worry about the arguments to this method.
If you change the return value of this method, make sure to propagate the
change to any test case that leverages RouteStringGenerator.
"""
return 1
TYPE_TO_GENERATOR_MAP = {
"string": lambda: "".join(
[random.choice(string.ascii_letters + string.digits) for _ in range(4)]
),
"int": lambda: random.choice(range(1000000)),
"number": lambda: random.random(),
"alpha": lambda: "".join(
[random.choice(string.ascii_letters) for _ in range(4)]
),
"uuid": lambda: str(uuid.uuid1()),
}
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = ["GET", "PUT", "POST", "PATCH", "DELETE", "OPTION"]
ROUTE_PARAM_TYPES = ["string", "int", "number", "alpha", "uuid"]
def generate_random_direct_route(self, max_route_depth=4):
routes = []
for depth in range(1, max_route_depth + 1):
for _ in range(self.ROUTE_COUNT_PER_DEPTH):
route = "/".join(
[
TYPE_TO_GENERATOR_MAP.get("string")()
for _ in range(depth)
]
)
route = route.replace(".", "", -1)
route_detail = (random.choice(self.HTTP_METHODS), route)
if route_detail not in routes:
routes.append(route_detail)
return routes
def add_typed_parameters(self, current_routes, max_route_depth=8):
routes = []
for method, route in current_routes:
current_length = len(route.split("/"))
new_route_part = "/".join(
[
"<{}:{}>".format(
TYPE_TO_GENERATOR_MAP.get("string")(),
random.choice(self.ROUTE_PARAM_TYPES),
)
for _ in range(max_route_depth - current_length)
]
)
route = "/".join([route, new_route_part])
route = route.replace(".", "", -1)
routes.append((method, route))
return routes
@staticmethod
def generate_url_for_template(template):
url = template
for pattern, param_type in re.findall(
re.compile(r"((?:<\w+:(string|int|number|alpha|uuid)>)+)"),
template,
):
value = TYPE_TO_GENERATOR_MAP.get(param_type)()
url = url.replace(pattern, str(value), -1)
return url
@pytest.fixture(scope="function")
def sanic_router():
# noinspection PyProtectedMember
def _setup(route_details: tuple) -> (Router, tuple):
router = Router()
added_router = []
for method, route in route_details:
try:
router._add(
uri="/{}".format(route),
methods=frozenset({method}),
host="localhost",
handler=_handler,
)
added_router.append((method, route))
except RouteExists:
pass
return router, added_router
return _setup
@pytest.fixture(scope="function")
def route_generator() -> RouteStringGenerator:
return RouteStringGenerator()
@pytest.fixture(scope="function")
def url_param_generator():
return TYPE_TO_GENERATOR_MAP
@pytest.fixture
def app(request):
return Sanic(request.node.name)

View File

@@ -1,12 +1,23 @@
import asyncio
import logging
import sys
from inspect import isawaitable
import pytest
from sanic.exceptions import SanicException
from sanic.response import text
def uvloop_installed():
try:
import uvloop
return True
except ImportError:
return False
def test_app_loop_running(app):
@app.get("/test")
async def handler(request):
@@ -17,9 +28,35 @@ def test_app_loop_running(app):
assert response.text == "pass"
@pytest.mark.skipif(
sys.version_info < (3, 7), reason="requires python3.7 or higher"
)
def test_create_asyncio_server(app):
if not uvloop_installed():
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
assert isawaitable(asyncio_srv_coro)
srv = loop.run_until_complete(asyncio_srv_coro)
assert srv.is_serving() is True
@pytest.mark.skipif(
sys.version_info < (3, 7), reason="requires python3.7 or higher"
)
def test_asyncio_server_start_serving(app):
if not uvloop_installed():
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
return_asyncio_server=True,
asyncio_server_kwargs=dict(start_serving=False),
)
srv = loop.run_until_complete(asyncio_srv_coro)
assert srv.is_serving() is False
def test_app_loop_not_running(app):
with pytest.raises(SanicException) as excinfo:
app.loop
_ = app.loop
assert str(excinfo.value) == (
"Loop can only be retrieved after the app has started "
@@ -103,7 +140,6 @@ def test_handle_request_with_nested_exception(app, monkeypatch):
@app.get("/")
def handler(request):
raise Exception
return text("OK")
request, response = app.test_client.get("/")
assert response.status == 500
@@ -125,7 +161,6 @@ def test_handle_request_with_nested_exception_debug(app, monkeypatch):
@app.get("/")
def handler(request):
raise Exception
return text("OK")
request, response = app.test_client.get("/", debug=True)
assert response.status == 500
@@ -149,14 +184,13 @@ def test_handle_request_with_nested_sanic_exception(app, monkeypatch, caplog):
@app.get("/")
def handler(request):
raise Exception
return text("OK")
with caplog.at_level(logging.ERROR):
request, response = app.test_client.get("/")
assert response.status == 500
assert response.text == "Error: Mock SanicException"
assert caplog.record_tuples[0] == (
assert (
"sanic.root",
logging.ERROR,
"Exception occurred while handling uri: 'http://127.0.0.1:42101/'",
)
) in caplog.record_tuples

View File

@@ -0,0 +1,180 @@
from pytest import raises
from sanic.app import Sanic
from sanic.blueprints import Blueprint
from sanic.request import Request
from sanic.response import text, HTTPResponse
MIDDLEWARE_INVOKE_COUNTER = {"request": 0, "response": 0}
AUTH = "dGVzdDp0ZXN0Cg=="
def test_bp_group_indexing(app: Sanic):
blueprint_1 = Blueprint("blueprint_1", url_prefix="/bp1")
blueprint_2 = Blueprint("blueprint_2", url_prefix="/bp2")
group = Blueprint.group(blueprint_1, blueprint_2)
assert group[0] == blueprint_1
with raises(expected_exception=IndexError) as e:
_ = group[3]
def test_bp_group_with_additional_route_params(app: Sanic):
blueprint_1 = Blueprint("blueprint_1", url_prefix="/bp1")
blueprint_2 = Blueprint("blueprint_2", url_prefix="/bp2")
@blueprint_1.route(
"/request_path", methods=frozenset({"PUT", "POST"}), version=2
)
def blueprint_1_v2_method_with_put_and_post(request: Request):
if request.method == "PUT":
return text("PUT_OK")
elif request.method == "POST":
return text("POST_OK")
@blueprint_2.route(
"/route/<param>", methods=frozenset({"DELETE", "PATCH"}), name="test"
)
def blueprint_2_named_method(request: Request, param):
if request.method == "DELETE":
return text("DELETE_{}".format(param))
elif request.method == "PATCH":
return text("PATCH_{}".format(param))
blueprint_group = Blueprint.group(
blueprint_1, blueprint_2, url_prefix="/api"
)
@blueprint_group.middleware("request")
def authenticate_request(request: Request):
global AUTH
auth = request.headers.get("authorization")
if auth:
# Dummy auth check. We can have anything here and it's fine.
if AUTH not in auth:
return text("Unauthorized", status=401)
else:
return text("Unauthorized", status=401)
@blueprint_group.middleware("response")
def enhance_response_middleware(request: Request, response: HTTPResponse):
response.headers.add("x-test-middleware", "value")
app.blueprint(blueprint_group)
header = {"authorization": " ".join(["Basic", AUTH])}
_, response = app.test_client.put(
"/v2/api/bp1/request_path", headers=header
)
assert response.text == "PUT_OK"
assert response.headers.get("x-test-middleware") == "value"
_, response = app.test_client.post(
"/v2/api/bp1/request_path", headers=header
)
assert response.text == "POST_OK"
_, response = app.test_client.delete("/api/bp2/route/bp2", headers=header)
assert response.text == "DELETE_bp2"
_, response = app.test_client.patch("/api/bp2/route/bp2", headers=header)
assert response.text == "PATCH_bp2"
_, response = app.test_client.get("/v2/api/bp1/request_path")
assert response.status == 401
def test_bp_group(app: Sanic):
blueprint_1 = Blueprint("blueprint_1", url_prefix="/bp1")
blueprint_2 = Blueprint("blueprint_2", url_prefix="/bp2")
@blueprint_1.route("/")
def blueprint_1_default_route(request):
return text("BP1_OK")
@blueprint_2.route("/")
def blueprint_2_default_route(request):
return text("BP2_OK")
blueprint_group_1 = Blueprint.group(
blueprint_1, blueprint_2, url_prefix="/bp"
)
blueprint_3 = Blueprint("blueprint_3", url_prefix="/bp3")
@blueprint_group_1.middleware("request")
def blueprint_group_1_middleware(request):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["request"] += 1
@blueprint_3.route("/")
def blueprint_3_default_route(request):
return text("BP3_OK")
blueprint_group_2 = Blueprint.group(
blueprint_group_1, blueprint_3, url_prefix="/api"
)
@blueprint_group_2.middleware("response")
def blueprint_group_2_middleware(request, response):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["response"] += 1
app.blueprint(blueprint_group_2)
@app.route("/")
def app_default_route(request):
return text("APP_OK")
_, response = app.test_client.get("/")
assert response.text == "APP_OK"
_, response = app.test_client.get("/api/bp/bp1")
assert response.text == "BP1_OK"
_, response = app.test_client.get("/api/bp/bp2")
assert response.text == "BP2_OK"
_, response = app.test_client.get("/api/bp3")
assert response.text == "BP3_OK"
assert MIDDLEWARE_INVOKE_COUNTER["response"] == 4
assert MIDDLEWARE_INVOKE_COUNTER["request"] == 4
def test_bp_group_list_operations(app: Sanic):
blueprint_1 = Blueprint("blueprint_1", url_prefix="/bp1")
blueprint_2 = Blueprint("blueprint_2", url_prefix="/bp2")
@blueprint_1.route("/")
def blueprint_1_default_route(request):
return text("BP1_OK")
@blueprint_2.route("/")
def blueprint_2_default_route(request):
return text("BP2_OK")
blueprint_group_1 = Blueprint.group(
blueprint_1, blueprint_2, url_prefix="/bp"
)
blueprint_3 = Blueprint("blueprint_2", url_prefix="/bp3")
@blueprint_3.route("/second")
def blueprint_3_second_route(request):
return text("BP3_OK")
assert len(blueprint_group_1) == 2
blueprint_group_1.append(blueprint_3)
assert len(blueprint_group_1) == 3
del blueprint_group_1[2]
assert len(blueprint_group_1) == 2
blueprint_group_1[1] = blueprint_3
assert len(blueprint_group_1) == 2
assert blueprint_group_1.url_prefix == "/bp"

View File

@@ -6,6 +6,7 @@ from textwrap import dedent
import pytest
from sanic import Sanic
from sanic.config import Config, DEFAULT_CONFIG
from sanic.exceptions import PyFileError
@@ -47,6 +48,13 @@ def test_auto_load_env():
del environ["SANIC_TEST_ANSWER"]
def test_auto_load_bool_env():
environ["SANIC_TEST_ANSWER"] = "True"
app = Sanic()
assert app.config.TEST_ANSWER == True
del environ["SANIC_TEST_ANSWER"]
def test_dont_load_env():
environ["SANIC_TEST_ANSWER"] = "42"
app = Sanic(load_env=False)
@@ -149,6 +157,115 @@ def test_overwrite_exisiting_config_ignore_lowercase(app):
def test_missing_config(app):
with pytest.raises(AttributeError) as e:
app.config.NON_EXISTENT
assert str(e.value) == ("Config has no 'NON_EXISTENT'")
with pytest.raises(
AttributeError, match="Config has no 'NON_EXISTENT'"
) as e:
_ = app.config.NON_EXISTENT
def test_config_defaults():
"""
load DEFAULT_CONFIG
"""
conf = Config()
for key, value in DEFAULT_CONFIG.items():
assert getattr(conf, key) == value
def test_config_custom_defaults():
"""
we should have all the variables from defaults rewriting them with custom defaults passed in
Config
"""
custom_defaults = {
"REQUEST_MAX_SIZE": 1,
"KEEP_ALIVE": False,
"ACCESS_LOG": False,
}
conf = Config(defaults=custom_defaults)
for key, value in DEFAULT_CONFIG.items():
if key in custom_defaults.keys():
value = custom_defaults[key]
assert getattr(conf, key) == value
def test_config_custom_defaults_with_env():
"""
test that environment variables has higher priority than DEFAULT_CONFIG and passed defaults dict
"""
custom_defaults = {
"REQUEST_MAX_SIZE123": 1,
"KEEP_ALIVE123": False,
"ACCESS_LOG123": False,
}
environ_defaults = {
"SANIC_REQUEST_MAX_SIZE123": "2",
"SANIC_KEEP_ALIVE123": "True",
"SANIC_ACCESS_LOG123": "False",
}
for key, value in environ_defaults.items():
environ[key] = value
conf = Config(defaults=custom_defaults)
for key, value in DEFAULT_CONFIG.items():
if "SANIC_" + key in environ_defaults.keys():
value = environ_defaults["SANIC_" + key]
try:
value = int(value)
except ValueError:
if value in ["True", "False"]:
value = value == "True"
assert getattr(conf, key) == value
for key, value in environ_defaults.items():
del environ[key]
def test_config_access_log_passing_in_run(app):
assert app.config.ACCESS_LOG == True
@app.listener("after_server_start")
async def _request(sanic, loop):
app.stop()
app.run(port=1340, access_log=False)
assert app.config.ACCESS_LOG == False
app.run(port=1340, access_log=True)
assert app.config.ACCESS_LOG == True
async def test_config_access_log_passing_in_create_server(app):
assert app.config.ACCESS_LOG == True
@app.listener("after_server_start")
async def _request(sanic, loop):
app.stop()
await app.create_server(
port=1341, access_log=False, return_asyncio_server=True
)
assert app.config.ACCESS_LOG == False
await app.create_server(
port=1342, access_log=True, return_asyncio_server=True
)
assert app.config.ACCESS_LOG == True
def test_config_rewrite_keep_alive():
config = Config()
assert config.KEEP_ALIVE == DEFAULT_CONFIG["KEEP_ALIVE"]
config = Config(keep_alive=True)
assert config.KEEP_ALIVE == True
config = Config(keep_alive=False)
assert config.KEEP_ALIVE == False
# use defaults
config = Config(defaults={"KEEP_ALIVE": False})
assert config.KEEP_ALIVE == False
config = Config(defaults={"KEEP_ALIVE": True})
assert config.KEEP_ALIVE == True

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta
from http.cookies import SimpleCookie
from sanic.response import text
import pytest
from sanic.cookies import Cookie
from sanic.cookies import Cookie, DEFAULT_MAX_AGE
# ------------------------------------------------------------ #
# GET
@@ -100,7 +100,7 @@ def test_cookie_deletion(app):
assert int(response_cookies["i_want_to_die"]["max-age"]) == 0
with pytest.raises(KeyError):
response.cookies["i_never_existed"]
_ = response.cookies["i_never_existed"]
def test_cookie_reserved_cookie():
@@ -138,7 +138,7 @@ def test_cookie_set_same_key(app):
assert response.cookies["test"].value == "pass"
@pytest.mark.parametrize("max_age", ["0", 30, "30"])
@pytest.mark.parametrize("max_age", ["0", 30, 30.0, 30.1, "30", "test"])
def test_cookie_max_age(app, max_age):
cookies = {"test": "wait"}
@@ -153,13 +153,14 @@ def test_cookie_max_age(app, max_age):
assert response.status == 200
assert response.cookies["test"].value == "pass"
assert response.cookies["test"]["max-age"] == str(max_age)
if str(max_age).isdigit() and int(max_age) == float(max_age):
assert response.cookies["test"]["max-age"] == str(max_age)
else:
assert response.cookies["test"]["max-age"] == str(DEFAULT_MAX_AGE)
@pytest.mark.parametrize(
"expires",
[datetime.now() + timedelta(seconds=60), "Fri, 21-Dec-2018 15:30:00 GMT"],
)
@pytest.mark.parametrize("expires", [datetime.now() + timedelta(seconds=60)])
def test_cookie_expires(app, expires):
cookies = {"test": "wait"}
@@ -179,3 +180,11 @@ def test_cookie_expires(app, expires):
expires = expires.strftime("%a, %d-%b-%Y %T GMT")
assert response.cookies["test"]["expires"] == expires
@pytest.mark.parametrize("expires", ["Fri, 21-Dec-2018 15:30:00 GMT"])
def test_cookie_expires_illegal_instance_type(expires):
c = Cookie("test_cookie", "value")
with pytest.raises(expected_exception=TypeError) as e:
c["expires"] = expires
assert e.message == "Cookie 'expires' property must be a datetime"

View File

@@ -39,5 +39,5 @@ def test_overload_dynamic_routes_exist(app):
with pytest.raises(RouteExists):
@app.route("/overload/<param>", methods=["PUT", "DELETE"])
async def handler3(request):
async def handler3(request, param):
return text("Duplicated")

View File

@@ -74,7 +74,7 @@ def exception_app():
@app.route("/divide_by_zero")
def handle_unhandled_exception(request):
1 / 0
_ = 1 / 0
@app.route("/error_in_error_handler_handler")
def custom_error_handler(request):
@@ -82,7 +82,7 @@ def exception_app():
@app.exception(SanicExceptionTestException)
def error_in_error_handler_handler(request, exception):
1 / 0
_ = 1 / 0
return app

View File

@@ -3,13 +3,15 @@ from sanic import Sanic
import asyncio
from asyncio import sleep as aio_sleep
from sanic.response import text
from sanic.config import Config
from sanic import server
import aiohttp
from aiohttp import TCPConnector
from sanic.testing import SanicTestClient, HOST, PORT
CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
class ReuseableTCPConnector(TCPConnector):
def __init__(self, *args, **kwargs):
super(ReuseableTCPConnector, self).__init__(*args, **kwargs)
@@ -47,7 +49,7 @@ class ReuseableSanicTestClient(SanicTestClient):
uri="/",
gather_request=True,
debug=False,
server_kwargs={},
server_kwargs={"return_asyncio_server": True},
*request_args,
**request_kwargs
):
@@ -141,7 +143,7 @@ class ReuseableSanicTestClient(SanicTestClient):
# loop, so the changes above are required too.
async def _local_request(self, method, uri, cookies=None, *args, **kwargs):
request_keepalive = kwargs.pop(
"request_keepalive", Config.KEEP_ALIVE_TIMEOUT
"request_keepalive", CONFIG_FOR_TESTS["KEEP_ALIVE_TIMEOUT"]
)
if uri.startswith(("http:", "https:", "ftp:", "ftps://" "//")):
url = uri
@@ -157,7 +159,7 @@ class ReuseableSanicTestClient(SanicTestClient):
conn = self._tcp_connector
else:
conn = ReuseableTCPConnector(
verify_ssl=False,
ssl=False,
loop=self._loop,
keepalive_timeout=request_keepalive,
)
@@ -191,12 +193,14 @@ class ReuseableSanicTestClient(SanicTestClient):
return response
Config.KEEP_ALIVE_TIMEOUT = 2
Config.KEEP_ALIVE = True
keep_alive_timeout_app_reuse = Sanic("test_ka_timeout_reuse")
keep_alive_app_client_timeout = Sanic("test_ka_client_timeout")
keep_alive_app_server_timeout = Sanic("test_ka_server_timeout")
keep_alive_timeout_app_reuse.config.update(CONFIG_FOR_TESTS)
keep_alive_app_client_timeout.config.update(CONFIG_FOR_TESTS)
keep_alive_app_server_timeout.config.update(CONFIG_FOR_TESTS)
@keep_alive_timeout_app_reuse.route("/1")
async def handler1(request):

View File

@@ -5,13 +5,14 @@ from sanic.config import BASE_LOGO
try:
import uvloop # noqa
ROW = 0
except BaseException:
ROW = 1
def test_logo_base(app, caplog):
server = app.create_server(debug=True)
server = app.create_server(debug=True, return_asyncio_server=True)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
@@ -30,7 +31,7 @@ def test_logo_base(app, caplog):
def test_logo_false(app, caplog):
app.config.LOGO = False
server = app.create_server(debug=True)
server = app.create_server(debug=True, return_asyncio_server=True)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
@@ -49,7 +50,7 @@ def test_logo_false(app, caplog):
def test_logo_true(app, caplog):
app.config.LOGO = True
server = app.create_server(debug=True)
server = app.create_server(debug=True, return_asyncio_server=True)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False
@@ -68,7 +69,7 @@ def test_logo_true(app, caplog):
def test_logo_custom(app, caplog):
app.config.LOGO = "My Custom Logo"
server = app.create_server(debug=True)
server = app.create_server(debug=True, return_asyncio_server=True)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._stopping = False

View File

@@ -73,6 +73,8 @@ def test_middleware_response_exception(app):
def test_middleware_response_raise_cancelled_error(app, caplog):
app.config.RESPONSE_TIMEOUT = 1
@app.middleware("response")
async def process_response(request, response):
raise CancelledError("CancelledError at response middleware")
@@ -84,12 +86,12 @@ def test_middleware_response_raise_cancelled_error(app, caplog):
with caplog.at_level(logging.ERROR):
reqrequest, response = app.test_client.get("/")
assert response.status == 503
assert caplog.record_tuples[0] == (
"sanic.root",
logging.ERROR,
"Exception occurred while handling uri: 'http://127.0.0.1:42101/'",
)
assert response.status == 503
assert (
"sanic.root",
logging.ERROR,
"Exception occurred while handling uri: 'http://127.0.0.1:42101/'",
) in caplog.record_tuples
def test_middleware_response_raise_exception(app, caplog):
@@ -101,16 +103,16 @@ def test_middleware_response_raise_exception(app, caplog):
reqrequest, response = app.test_client.get("/")
assert response.status == 404
assert caplog.record_tuples[0] == (
assert (
"sanic.root",
logging.ERROR,
"Exception occurred while handling uri: 'http://127.0.0.1:42101/'",
)
assert caplog.record_tuples[1] == (
) in caplog.record_tuples
assert (
"sanic.error",
logging.ERROR,
"Exception occurred in one of response middleware handlers",
)
) in caplog.record_tuples
def test_middleware_override_request(app):

View File

@@ -270,6 +270,21 @@ def test_request_stream_blueprint(app):
return stream(streaming)
async def post_add_route(request):
assert isinstance(request.stream, StreamBuffer)
async def streaming(response):
while True:
body = await request.stream.read()
if body is None:
break
await response.write(body.decode("utf-8"))
return stream(streaming)
bp.add_route(
post_add_route, "/post/add_route", methods=["POST"], stream=True
)
app.blueprint(bp)
assert app.is_request_stream is True
@@ -314,6 +329,10 @@ def test_request_stream_blueprint(app):
assert response.status == 200
assert response.text == data
request, response = app.test_client.post("/post/add_route", data=data)
assert response.status == 200
assert response.text == data
def test_request_stream_composition_view(app):
"""for self.is_request_stream = True"""

View File

@@ -3,7 +3,6 @@ from json import JSONDecodeError
from sanic import Sanic
import asyncio
from sanic.response import text
from sanic.config import Config
import aiohttp
from aiohttp import TCPConnector
from sanic.testing import SanicTestClient, HOST
@@ -152,9 +151,7 @@ class DelayableSanicTestClient(SanicTestClient):
host=HOST, port=self.port, uri=uri
)
conn = DelayableTCPConnector(
pre_request_delay=self._request_delay,
verify_ssl=False,
loop=self._loop,
pre_request_delay=self._request_delay, ssl=False, loop=self._loop
)
async with aiohttp.ClientSession(
cookies=cookies, connector=conn, loop=self._loop
@@ -183,9 +180,10 @@ class DelayableSanicTestClient(SanicTestClient):
return response
Config.REQUEST_TIMEOUT = 0.6
request_timeout_default_app = Sanic("test_request_timeout_default")
request_no_timeout_app = Sanic("test_request_no_timeout")
request_timeout_default_app.config.REQUEST_TIMEOUT = 0.6
request_no_timeout_app.config.REQUEST_TIMEOUT = 0.6
@request_timeout_default_app.route("/1")

View File

@@ -7,6 +7,8 @@ from urllib.parse import urlparse
import pytest
from sanic import Sanic
from sanic import Blueprint
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE
from sanic.response import json, text
@@ -132,7 +134,7 @@ def test_query_string(app):
def test_uri_template(app):
@app.route("/foo/<id:int>/bar/<name:[A-z]+>")
async def handler(request):
async def handler(request, id, name):
return text("OK")
request, response = app.test_client.get("/foo/123/bar/baz")
@@ -430,21 +432,41 @@ def test_request_string_representation(app):
@pytest.mark.parametrize(
"payload",
"payload,filename",
[
"------sanic\r\n"
'Content-Disposition: form-data; filename="filename"; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n",
"------sanic\r\n"
'content-disposition: form-data; filename="filename"; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n",
("------sanic\r\n"
'Content-Disposition: form-data; filename="filename"; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n", "filename"),
("------sanic\r\n"
'content-disposition: form-data; filename="filename"; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n", "filename"),
("------sanic\r\n"
'Content-Disposition: form-data; filename=""; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n", ""),
("------sanic\r\n"
'content-disposition: form-data; filename=""; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n", ""),
("------sanic\r\n"
'Content-Disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
"\r\n"
"OK\r\n"
"------sanic--\r\n", "filename_\u00A0_test"),
("------sanic\r\n"
'content-disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
"\r\n"
'content-type: application/json; {"field": "value"}\r\n'
"------sanic--\r\n", "filename_\u00A0_test"),
],
)
def test_request_multipart_files(app, payload):
def test_request_multipart_files(app, payload, filename):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
@@ -452,7 +474,7 @@ def test_request_multipart_files(app, payload):
headers = {"content-type": "multipart/form-data; boundary=----sanic"}
request, _ = app.test_client.post(data=payload, headers=headers)
assert request.files.get("test").name == "filename"
assert request.files.get("test").name == filename
def test_request_multipart_file_with_json_content_type(app):
@@ -564,7 +586,7 @@ def test_request_repr(app):
assert repr(request) == "<Request: GET />"
request.method = None
assert repr(request) == "<Request>"
assert repr(request) == "<Request: None />"
def test_request_bool(app):
@@ -698,3 +720,42 @@ def test_request_form_invalid_content_type(app):
request, response = app.test_client.post("/", json={"test": "OK"})
assert request.form == {}
def test_endpoint_basic():
app = Sanic()
@app.route("/")
def my_unique_handler(request):
return text("Hello")
request, response = app.test_client.get("/")
assert request.endpoint == "test_requests.my_unique_handler"
def test_endpoint_named_app():
app = Sanic("named")
@app.route("/")
def my_unique_handler(request):
return text("Hello")
request, response = app.test_client.get("/")
assert request.endpoint == "named.my_unique_handler"
def test_endpoint_blueprint():
bp = Blueprint("my_blueprint", url_prefix="/bp")
@bp.route("/")
async def bp_root(request):
return text("Hello")
app = Sanic("named")
app.blueprint(bp)
request, response = app.test_client.get("/bp")
assert request.endpoint == "named.my_blueprint.bp_root"

View File

@@ -2,13 +2,15 @@ from sanic import Sanic
import asyncio
from sanic.response import text
from sanic.exceptions import ServiceUnavailable
from sanic.config import Config
Config.RESPONSE_TIMEOUT = 1
response_timeout_app = Sanic("test_response_timeout")
response_timeout_default_app = Sanic("test_response_timeout_default")
response_handler_cancelled_app = Sanic("test_response_handler_cancelled")
response_timeout_app.config.RESPONSE_TIMEOUT = 1
response_timeout_default_app.config.RESPONSE_TIMEOUT = 1
response_handler_cancelled_app.config.RESPONSE_TIMEOUT = 1
@response_timeout_app.route("/1")
async def handler_1(request):

View File

@@ -83,7 +83,7 @@ async def test_trigger_before_events_create_server(app):
async def init_db(app, loop):
app.db = MySanicDb()
await app.create_server()
await app.create_server(debug=True, return_asyncio_server=True)
assert hasattr(app, "db")
assert isinstance(app.db, MySanicDb)

View File

@@ -169,12 +169,28 @@ def test_fails_with_int_message(app):
app.url_for("fail", **failing_kwargs)
expected_error = (
'Value "not_int" for parameter `foo` '
"does not match pattern for type `int`: \d+"
r'Value "not_int" for parameter `foo` '
r'does not match pattern for type `int`: -?\d+'
)
assert str(e.value) == expected_error
def test_passes_with_negative_int_message(app):
@app.route("path/<possibly_neg:int>/another-word")
def good(request, possibly_neg):
assert isinstance(possibly_neg, int)
return text("this should pass with `{}`".format(possibly_neg))
u_plus_3 = app.url_for("good", possibly_neg=3)
assert u_plus_3 == "/path/3/another-word", u_plus_3
request, response = app.test_client.get(u_plus_3)
assert response.text == "this should pass with `3`"
u_neg_3 = app.url_for("good", possibly_neg=-3)
assert u_neg_3 == "/path/-3/another-word", u_neg_3
request, response = app.test_client.get(u_neg_3)
assert response.text == "this should pass with `-3`"
def test_fails_with_two_letter_string_message(app):
@app.route(COMPLEX_PARAM_URL)
def fail(request):
@@ -207,12 +223,26 @@ def test_fails_with_number_message(app):
expected_error = (
'Value "foo" for parameter `some_number` '
"does not match pattern for type `float`: [0-9\\\\.]+"
"does not match pattern for type `float`: -?[0-9\\\\.]+"
)
assert str(e.value) == expected_error
@pytest.mark.parametrize("number", [3, -3, 13.123, -13.123])
def test_passes_with_negative_number_message(app, number):
@app.route("path/<possibly_neg:number>/another-word")
def good(request, possibly_neg):
assert isinstance(possibly_neg, (int, float))
return text("this should pass with `{}`".format(possibly_neg))
u = app.url_for("good", possibly_neg=number)
assert u == "/path/{}/another-word".format(number), u
request, response = app.test_client.get(u)
# For ``number``, it has been cast to a float - so a ``3`` becomes a ``3.0``
assert response.text == "this should pass with `{}`".format(float(number))
def test_adds_other_supplied_values_as_query_string(app):
@app.route(COMPLEX_PARAM_URL)
def passes(request):

View File

@@ -24,12 +24,61 @@ def gunicorn_worker():
worker.kill()
@pytest.fixture(scope="module")
def gunicorn_worker_with_access_logs():
command = (
"gunicorn "
"--bind 127.0.0.1:1338 "
"--worker-class sanic.worker.GunicornWorker "
"examples.simple_server:app"
)
worker = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
time.sleep(2)
return worker
@pytest.fixture(scope="module")
def gunicorn_worker_with_env_var():
command = (
'env SANIC_ACCESS_LOG="False" '
"gunicorn "
"--bind 127.0.0.1:1339 "
"--worker-class sanic.worker.GunicornWorker "
"--log-level info "
"examples.simple_server:app"
)
worker = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
time.sleep(2)
return worker
def test_gunicorn_worker(gunicorn_worker):
with urllib.request.urlopen("http://localhost:1337/") as f:
res = json.loads(f.read(100).decode())
assert res["test"]
def test_gunicorn_worker_no_logs(gunicorn_worker_with_env_var):
"""
if SANIC_ACCESS_LOG was set to False do not show access logs
"""
with urllib.request.urlopen("http://localhost:1339/") as _:
gunicorn_worker_with_env_var.kill()
assert not gunicorn_worker_with_env_var.stdout.read()
def test_gunicorn_worker_with_logs(gunicorn_worker_with_access_logs):
"""
default - show access logs
"""
with urllib.request.urlopen("http://localhost:1338/") as _:
gunicorn_worker_with_access_logs.kill()
assert (
b"(sanic.access)[INFO][127.0.0.1"
in gunicorn_worker_with_access_logs.stdout.read()
)
class GunicornTestWorker(GunicornWorker):
def __init__(self):
self.app = mock.Mock()