Merge branch 'master' into streaming
This commit is contained in:
commit
f8298939c0
10
sanic/app.py
10
sanic/app.py
|
@ -826,6 +826,14 @@ class Sanic:
|
||||||
"Endpoint with name `{}` was not found".format(view_name)
|
"Endpoint with name `{}` was not found".format(view_name)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If the route has host defined, split that off
|
||||||
|
# TODO: Retain netloc and path separately in Route objects
|
||||||
|
host = uri.find("/")
|
||||||
|
if host > 0:
|
||||||
|
host, uri = uri[:host], uri[host:]
|
||||||
|
else:
|
||||||
|
host = None
|
||||||
|
|
||||||
if view_name == "static" or view_name.endswith(".static"):
|
if view_name == "static" or view_name.endswith(".static"):
|
||||||
filename = kwargs.pop("filename", None)
|
filename = kwargs.pop("filename", None)
|
||||||
# it's static folder
|
# it's static folder
|
||||||
|
@ -858,7 +866,7 @@ class Sanic:
|
||||||
|
|
||||||
netloc = kwargs.pop("_server", None)
|
netloc = kwargs.pop("_server", None)
|
||||||
if netloc is None and external:
|
if netloc is None and external:
|
||||||
netloc = self.config.get("SERVER_NAME", "")
|
netloc = host or self.config.get("SERVER_NAME", "")
|
||||||
|
|
||||||
if external:
|
if external:
|
||||||
if not scheme:
|
if not scheme:
|
||||||
|
|
|
@ -103,7 +103,7 @@ def sanic_router():
|
||||||
for method, route in route_details:
|
for method, route in route_details:
|
||||||
try:
|
try:
|
||||||
router._add(
|
router._add(
|
||||||
uri="/{}".format(route),
|
uri=f"/{route}",
|
||||||
methods=frozenset({method}),
|
methods=frozenset({method}),
|
||||||
host="localhost",
|
host="localhost",
|
||||||
handler=_handler,
|
handler=_handler,
|
||||||
|
|
|
@ -179,9 +179,7 @@ def test_handle_request_with_nested_exception_debug(app, monkeypatch):
|
||||||
request, response = app.test_client.get("/", debug=True)
|
request, response = app.test_client.get("/", debug=True)
|
||||||
assert response.status == 500
|
assert response.status == 500
|
||||||
assert response.text.startswith(
|
assert response.text.startswith(
|
||||||
"Error while handling error: {}\nStack: Traceback (most recent call last):\n".format(
|
f"Error while handling error: {err_msg}\nStack: Traceback (most recent call last):\n"
|
||||||
err_msg
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -40,9 +40,9 @@ def test_bp_group_with_additional_route_params(app: Sanic):
|
||||||
)
|
)
|
||||||
def blueprint_2_named_method(request: Request, param):
|
def blueprint_2_named_method(request: Request, param):
|
||||||
if request.method == "DELETE":
|
if request.method == "DELETE":
|
||||||
return text("DELETE_{}".format(param))
|
return text(f"DELETE_{param}")
|
||||||
elif request.method == "PATCH":
|
elif request.method == "PATCH":
|
||||||
return text("PATCH_{}".format(param))
|
return text(f"PATCH_{param}")
|
||||||
|
|
||||||
blueprint_group = Blueprint.group(
|
blueprint_group = Blueprint.group(
|
||||||
blueprint_1, blueprint_2, url_prefix="/api"
|
blueprint_1, blueprint_2, url_prefix="/api"
|
||||||
|
|
|
@ -46,19 +46,19 @@ def test_versioned_routes_get(app, method):
|
||||||
func = getattr(bp, method)
|
func = getattr(bp, method)
|
||||||
if callable(func):
|
if callable(func):
|
||||||
|
|
||||||
@func("/{}".format(method), version=1)
|
@func(f"/{method}", version=1)
|
||||||
def handler(request):
|
def handler(request):
|
||||||
return text("OK")
|
return text("OK")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print(func)
|
print(func)
|
||||||
raise Exception("{} is not callable".format(func))
|
raise Exception(f"{func} is not callable")
|
||||||
|
|
||||||
app.blueprint(bp)
|
app.blueprint(bp)
|
||||||
|
|
||||||
client_method = getattr(app.test_client, method)
|
client_method = getattr(app.test_client, method)
|
||||||
|
|
||||||
request, response = client_method("/v1/{}".format(method))
|
request, response = client_method(f"/v1/{method}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
|
||||||
|
|
||||||
|
@ -543,7 +543,7 @@ def test_bp_group_with_default_url_prefix(app):
|
||||||
|
|
||||||
resource_id = str(uuid4())
|
resource_id = str(uuid4())
|
||||||
request, response = app.test_client.get(
|
request, response = app.test_client.get(
|
||||||
"/api/v1/resources/{0}".format(resource_id)
|
f"/api/v1/resources/{resource_id}"
|
||||||
)
|
)
|
||||||
assert response.json == {"resource_id": resource_id}
|
assert response.json == {"resource_id": resource_id}
|
||||||
|
|
||||||
|
@ -658,9 +658,9 @@ def test_duplicate_blueprint(app):
|
||||||
app.blueprint(bp1)
|
app.blueprint(bp1)
|
||||||
|
|
||||||
assert str(excinfo.value) == (
|
assert str(excinfo.value) == (
|
||||||
'A blueprint with the name "{}" is already registered. '
|
f'A blueprint with the name "{bp_name}" is already registered. '
|
||||||
"Blueprint names must be unique."
|
"Blueprint names must be unique."
|
||||||
).format(bp_name)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("debug", [True, False, None])
|
@pytest.mark.parametrize("debug", [True, False, None])
|
||||||
|
|
|
@ -15,7 +15,8 @@ from sanic.response import text
|
||||||
def test_cookies(app):
|
def test_cookies(app):
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def handler(request):
|
def handler(request):
|
||||||
response = text("Cookies are: {}".format(request.cookies["test"]))
|
cookie_value = request.cookies["test"]
|
||||||
|
response = text(f"Cookies are: {cookie_value}")
|
||||||
response.cookies["right_back"] = "at you"
|
response.cookies["right_back"] = "at you"
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
@ -31,7 +32,8 @@ def test_cookies(app):
|
||||||
async def test_cookies_asgi(app):
|
async def test_cookies_asgi(app):
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def handler(request):
|
def handler(request):
|
||||||
response = text("Cookies are: {}".format(request.cookies["test"]))
|
cookie_value = request.cookies["test"]
|
||||||
|
response = text(f"Cookies are: {cookie_value}")
|
||||||
response.cookies["right_back"] = "at you"
|
response.cookies["right_back"] = "at you"
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
@ -78,7 +80,8 @@ def test_false_cookies(app, httponly, expected):
|
||||||
def test_http2_cookies(app):
|
def test_http2_cookies(app):
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
async def handler(request):
|
async def handler(request):
|
||||||
response = text("Cookies are: {}".format(request.cookies["test"]))
|
cookie_value = request.cookies["test"]
|
||||||
|
response = text(f"Cookies are: {cookie_value}")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
headers = {"cookie": "test=working!"}
|
headers = {"cookie": "test=working!"}
|
||||||
|
|
|
@ -43,7 +43,7 @@ def handler_6(request, arg):
|
||||||
try:
|
try:
|
||||||
foo = 1 / arg
|
foo = 1 / arg
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e from ValueError("{}".format(arg))
|
raise e from ValueError(f"{arg}")
|
||||||
return text(foo)
|
return text(foo)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -97,11 +97,9 @@ class ReuseableSanicTestClient(SanicTestClient):
|
||||||
):
|
):
|
||||||
url = uri
|
url = uri
|
||||||
else:
|
else:
|
||||||
uri = uri if uri.startswith("/") else "/{uri}".format(uri=uri)
|
uri = uri if uri.startswith("/") else f"/{uri}"
|
||||||
scheme = "http"
|
scheme = "http"
|
||||||
url = "{scheme}://{host}:{port}{uri}".format(
|
url = f"{scheme}://{HOST}:{PORT}{uri}"
|
||||||
scheme=scheme, host=HOST, port=PORT, uri=uri
|
|
||||||
)
|
|
||||||
|
|
||||||
@self.app.listener("after_server_start")
|
@self.app.listener("after_server_start")
|
||||||
async def _collect_response(loop):
|
async def _collect_response(loop):
|
||||||
|
@ -134,7 +132,7 @@ class ReuseableSanicTestClient(SanicTestClient):
|
||||||
self.app.listeners["after_server_start"].pop()
|
self.app.listeners["after_server_start"].pop()
|
||||||
|
|
||||||
if exceptions:
|
if exceptions:
|
||||||
raise ValueError("Exception during request: {}".format(exceptions))
|
raise ValueError(f"Exception during request: {exceptions}")
|
||||||
|
|
||||||
if gather_request:
|
if gather_request:
|
||||||
self.app.request_middleware.pop()
|
self.app.request_middleware.pop()
|
||||||
|
@ -143,16 +141,14 @@ class ReuseableSanicTestClient(SanicTestClient):
|
||||||
return request, response
|
return request, response
|
||||||
except Exception:
|
except Exception:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Request and response object expected, got ({})".format(
|
f"Request and response object expected, got ({results})"
|
||||||
results
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
return results[-1]
|
return results[-1]
|
||||||
except Exception:
|
except Exception:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Request object expected, got ({})".format(results)
|
f"Request object expected, got ({results})"
|
||||||
)
|
)
|
||||||
|
|
||||||
def kill_server(self):
|
def kill_server(self):
|
||||||
|
|
|
@ -52,7 +52,7 @@ def test_logo_false(app, caplog):
|
||||||
assert caplog.record_tuples[ROW][1] == logging.INFO
|
assert caplog.record_tuples[ROW][1] == logging.INFO
|
||||||
assert caplog.record_tuples[ROW][
|
assert caplog.record_tuples[ROW][
|
||||||
2
|
2
|
||||||
] == "Goin' Fast @ http://127.0.0.1:{}".format(PORT)
|
] == f"Goin' Fast @ http://127.0.0.1:{PORT}"
|
||||||
|
|
||||||
|
|
||||||
def test_logo_true(app, caplog):
|
def test_logo_true(app, caplog):
|
||||||
|
|
|
@ -21,13 +21,13 @@ def test_versioned_named_routes_get(app, method):
|
||||||
bp = Blueprint("test_bp", url_prefix="/bp")
|
bp = Blueprint("test_bp", url_prefix="/bp")
|
||||||
|
|
||||||
method = method.lower()
|
method = method.lower()
|
||||||
route_name = "route_{}".format(method)
|
route_name = f"route_{method}"
|
||||||
route_name2 = "route2_{}".format(method)
|
route_name2 = f"route2_{method}"
|
||||||
|
|
||||||
func = getattr(app, method)
|
func = getattr(app, method)
|
||||||
if callable(func):
|
if callable(func):
|
||||||
|
|
||||||
@func("/{}".format(method), version=1, name=route_name)
|
@func(f"/{method}", version=1, name=route_name)
|
||||||
def handler(request):
|
def handler(request):
|
||||||
return text("OK")
|
return text("OK")
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ def test_versioned_named_routes_get(app, method):
|
||||||
func = getattr(bp, method)
|
func = getattr(bp, method)
|
||||||
if callable(func):
|
if callable(func):
|
||||||
|
|
||||||
@func("/{}".format(method), version=1, name=route_name2)
|
@func(f"/{method}", version=1, name=route_name2)
|
||||||
def handler2(request):
|
def handler2(request):
|
||||||
return text("OK")
|
return text("OK")
|
||||||
|
|
||||||
|
@ -48,14 +48,14 @@ def test_versioned_named_routes_get(app, method):
|
||||||
|
|
||||||
app.blueprint(bp)
|
app.blueprint(bp)
|
||||||
|
|
||||||
assert app.router.routes_all["/v1/{}".format(method)].name == route_name
|
assert app.router.routes_all[f"/v1/{method}"].name == route_name
|
||||||
|
|
||||||
route = app.router.routes_all["/v1/bp/{}".format(method)]
|
route = app.router.routes_all[f"/v1/bp/{method}"]
|
||||||
assert route.name == "test_bp.{}".format(route_name2)
|
assert route.name == f"test_bp.{route_name2}"
|
||||||
|
|
||||||
assert app.url_for(route_name) == "/v1/{}".format(method)
|
assert app.url_for(route_name) == f"/v1/{method}"
|
||||||
url = app.url_for("test_bp.{}".format(route_name2))
|
url = app.url_for(f"test_bp.{route_name2}")
|
||||||
assert url == "/v1/bp/{}".format(method)
|
assert url == f"/v1/bp/{method}"
|
||||||
with pytest.raises(URLBuildError):
|
with pytest.raises(URLBuildError):
|
||||||
app.url_for("handler")
|
app.url_for("handler")
|
||||||
|
|
||||||
|
|
|
@ -115,14 +115,14 @@ def test_redirect_with_params(app, test_str):
|
||||||
|
|
||||||
@app.route("/api/v1/test/<test>/")
|
@app.route("/api/v1/test/<test>/")
|
||||||
async def init_handler(request, test):
|
async def init_handler(request, test):
|
||||||
return redirect("/api/v2/test/{}/".format(use_in_uri))
|
return redirect(f"/api/v2/test/{use_in_uri}/")
|
||||||
|
|
||||||
@app.route("/api/v2/test/<test>/")
|
@app.route("/api/v2/test/<test>/")
|
||||||
async def target_handler(request, test):
|
async def target_handler(request, test):
|
||||||
assert test == test_str
|
assert test == test_str
|
||||||
return text("OK")
|
return text("OK")
|
||||||
|
|
||||||
_, response = app.test_client.get("/api/v1/test/{}/".format(use_in_uri))
|
_, response = app.test_client.get(f"/api/v1/test/{use_in_uri}/")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
|
||||||
assert response.content == b"OK"
|
assert response.content == b"OK"
|
||||||
|
|
|
@ -44,7 +44,7 @@ async def test_sync_asgi(app):
|
||||||
def test_ip(app):
|
def test_ip(app):
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def handler(request):
|
def handler(request):
|
||||||
return text("{}".format(request.ip))
|
return text(f"{request.ip}")
|
||||||
|
|
||||||
request, response = app.test_client.get("/")
|
request, response = app.test_client.get("/")
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ def test_ip(app):
|
||||||
async def test_ip_asgi(app):
|
async def test_ip_asgi(app):
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def handler(request):
|
def handler(request):
|
||||||
return text("{}".format(request.url))
|
return text(f"{request.url}")
|
||||||
|
|
||||||
request, response = await app.asgi_client.get("/")
|
request, response = await app.asgi_client.get("/")
|
||||||
|
|
||||||
|
@ -325,7 +325,7 @@ def test_token(app):
|
||||||
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
||||||
headers = {
|
headers = {
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
"Authorization": "{}".format(token),
|
"Authorization": f"{token}",
|
||||||
}
|
}
|
||||||
|
|
||||||
request, response = app.test_client.get("/", headers=headers)
|
request, response = app.test_client.get("/", headers=headers)
|
||||||
|
@ -335,7 +335,7 @@ def test_token(app):
|
||||||
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
||||||
headers = {
|
headers = {
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
"Authorization": "Token {}".format(token),
|
"Authorization": f"Token {token}",
|
||||||
}
|
}
|
||||||
|
|
||||||
request, response = app.test_client.get("/", headers=headers)
|
request, response = app.test_client.get("/", headers=headers)
|
||||||
|
@ -345,7 +345,7 @@ def test_token(app):
|
||||||
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
||||||
headers = {
|
headers = {
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
"Authorization": "Bearer {}".format(token),
|
"Authorization": f"Bearer {token}",
|
||||||
}
|
}
|
||||||
|
|
||||||
request, response = app.test_client.get("/", headers=headers)
|
request, response = app.test_client.get("/", headers=headers)
|
||||||
|
@ -370,7 +370,7 @@ async def test_token_asgi(app):
|
||||||
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
||||||
headers = {
|
headers = {
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
"Authorization": "{}".format(token),
|
"Authorization": f"{token}",
|
||||||
}
|
}
|
||||||
|
|
||||||
request, response = await app.asgi_client.get("/", headers=headers)
|
request, response = await app.asgi_client.get("/", headers=headers)
|
||||||
|
@ -380,7 +380,7 @@ async def test_token_asgi(app):
|
||||||
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
||||||
headers = {
|
headers = {
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
"Authorization": "Token {}".format(token),
|
"Authorization": f"Token {token}",
|
||||||
}
|
}
|
||||||
|
|
||||||
request, response = await app.asgi_client.get("/", headers=headers)
|
request, response = await app.asgi_client.get("/", headers=headers)
|
||||||
|
@ -390,7 +390,7 @@ async def test_token_asgi(app):
|
||||||
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
|
||||||
headers = {
|
headers = {
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
"Authorization": "Bearer {}".format(token),
|
"Authorization": f"Bearer {token}",
|
||||||
}
|
}
|
||||||
|
|
||||||
request, response = await app.asgi_client.get("/", headers=headers)
|
request, response = await app.asgi_client.get("/", headers=headers)
|
||||||
|
@ -1028,7 +1028,7 @@ def test_url_attributes_no_ssl(app, path, query, expected_url):
|
||||||
|
|
||||||
app.add_route(handler, path)
|
app.add_route(handler, path)
|
||||||
|
|
||||||
request, response = app.test_client.get(path + "?{}".format(query))
|
request, response = app.test_client.get(path + f"?{query}")
|
||||||
assert request.url == expected_url.format(HOST, PORT)
|
assert request.url == expected_url.format(HOST, PORT)
|
||||||
|
|
||||||
parsed = urlparse(request.url)
|
parsed = urlparse(request.url)
|
||||||
|
@ -1054,7 +1054,7 @@ async def test_url_attributes_no_ssl_asgi(app, path, query, expected_url):
|
||||||
|
|
||||||
app.add_route(handler, path)
|
app.add_route(handler, path)
|
||||||
|
|
||||||
request, response = await app.asgi_client.get(path + "?{}".format(query))
|
request, response = await app.asgi_client.get(path + f"?{query}")
|
||||||
assert request.url == expected_url.format(ASGI_HOST)
|
assert request.url == expected_url.format(ASGI_HOST)
|
||||||
|
|
||||||
parsed = urlparse(request.url)
|
parsed = urlparse(request.url)
|
||||||
|
@ -1087,7 +1087,7 @@ def test_url_attributes_with_ssl_context(app, path, query, expected_url):
|
||||||
app.add_route(handler, path)
|
app.add_route(handler, path)
|
||||||
|
|
||||||
request, response = app.test_client.get(
|
request, response = app.test_client.get(
|
||||||
"https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
|
f"https://{HOST}:{PORT}" + path + f"?{query}",
|
||||||
server_kwargs={"ssl": context},
|
server_kwargs={"ssl": context},
|
||||||
)
|
)
|
||||||
assert request.url == expected_url.format(HOST, PORT)
|
assert request.url == expected_url.format(HOST, PORT)
|
||||||
|
@ -1122,7 +1122,7 @@ def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
|
||||||
app.add_route(handler, path)
|
app.add_route(handler, path)
|
||||||
|
|
||||||
request, response = app.test_client.get(
|
request, response = app.test_client.get(
|
||||||
"https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
|
f"https://{HOST}:{PORT}" + path + f"?{query}",
|
||||||
server_kwargs={"ssl": ssl_dict},
|
server_kwargs={"ssl": ssl_dict},
|
||||||
)
|
)
|
||||||
assert request.url == expected_url.format(HOST, PORT)
|
assert request.url == expected_url.format(HOST, PORT)
|
||||||
|
|
|
@ -297,7 +297,7 @@ def test_file_response(app, file_name, static_file_directory, status):
|
||||||
mime_type=guess_type(file_path)[0] or "text/plain",
|
mime_type=guess_type(file_path)[0] or "text/plain",
|
||||||
)
|
)
|
||||||
|
|
||||||
request, response = app.test_client.get("/files/{}".format(file_name))
|
request, response = app.test_client.get(f"/files/{file_name}")
|
||||||
assert response.status == status
|
assert response.status == status
|
||||||
assert response.body == get_file_content(static_file_directory, file_name)
|
assert response.body == get_file_content(static_file_directory, file_name)
|
||||||
assert "Content-Disposition" not in response.headers
|
assert "Content-Disposition" not in response.headers
|
||||||
|
@ -320,12 +320,12 @@ def test_file_response_custom_filename(
|
||||||
file_path = os.path.abspath(unquote(file_path))
|
file_path = os.path.abspath(unquote(file_path))
|
||||||
return file(file_path, filename=dest)
|
return file(file_path, filename=dest)
|
||||||
|
|
||||||
request, response = app.test_client.get("/files/{}".format(source))
|
request, response = app.test_client.get(f"/files/{source}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.body == get_file_content(static_file_directory, source)
|
assert response.body == get_file_content(static_file_directory, source)
|
||||||
assert response.headers[
|
assert response.headers[
|
||||||
"Content-Disposition"
|
"Content-Disposition"
|
||||||
] == 'attachment; filename="{}"'.format(dest)
|
] == f'attachment; filename="{dest}"'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
|
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
|
||||||
|
@ -350,7 +350,7 @@ def test_file_head_response(app, file_name, static_file_directory):
|
||||||
mime_type=guess_type(file_path)[0] or "text/plain",
|
mime_type=guess_type(file_path)[0] or "text/plain",
|
||||||
)
|
)
|
||||||
|
|
||||||
request, response = app.test_client.head("/files/{}".format(file_name))
|
request, response = app.test_client.head(f"/files/{file_name}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert "Accept-Ranges" in response.headers
|
assert "Accept-Ranges" in response.headers
|
||||||
assert "Content-Length" in response.headers
|
assert "Content-Length" in response.headers
|
||||||
|
@ -373,7 +373,7 @@ def test_file_stream_response(app, file_name, static_file_directory):
|
||||||
mime_type=guess_type(file_path)[0] or "text/plain",
|
mime_type=guess_type(file_path)[0] or "text/plain",
|
||||||
)
|
)
|
||||||
|
|
||||||
request, response = app.test_client.get("/files/{}".format(file_name))
|
request, response = app.test_client.get(f"/files/{file_name}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.body == get_file_content(static_file_directory, file_name)
|
assert response.body == get_file_content(static_file_directory, file_name)
|
||||||
assert "Content-Disposition" not in response.headers
|
assert "Content-Disposition" not in response.headers
|
||||||
|
@ -396,12 +396,12 @@ def test_file_stream_response_custom_filename(
|
||||||
file_path = os.path.abspath(unquote(file_path))
|
file_path = os.path.abspath(unquote(file_path))
|
||||||
return file_stream(file_path, chunk_size=32, filename=dest)
|
return file_stream(file_path, chunk_size=32, filename=dest)
|
||||||
|
|
||||||
request, response = app.test_client.get("/files/{}".format(source))
|
request, response = app.test_client.get(f"/files/{source}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.body == get_file_content(static_file_directory, source)
|
assert response.body == get_file_content(static_file_directory, source)
|
||||||
assert response.headers[
|
assert response.headers[
|
||||||
"Content-Disposition"
|
"Content-Disposition"
|
||||||
] == 'attachment; filename="{}"'.format(dest)
|
] == f'attachment; filename="{dest}"'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
|
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
|
||||||
|
@ -429,7 +429,7 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
|
||||||
mime_type=guess_type(file_path)[0] or "text/plain",
|
mime_type=guess_type(file_path)[0] or "text/plain",
|
||||||
)
|
)
|
||||||
|
|
||||||
request, response = app.test_client.head("/files/{}".format(file_name))
|
request, response = app.test_client.head(f"/files/{file_name}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
# A HEAD request should never be streamed/chunked.
|
# A HEAD request should never be streamed/chunked.
|
||||||
if "Transfer-Encoding" in response.headers:
|
if "Transfer-Encoding" in response.headers:
|
||||||
|
@ -467,13 +467,10 @@ def test_file_stream_response_range(
|
||||||
_range=range,
|
_range=range,
|
||||||
)
|
)
|
||||||
|
|
||||||
request, response = app.test_client.get("/files/{}".format(file_name))
|
request, response = app.test_client.get(f"/files/{file_name}")
|
||||||
print(response.body)
|
|
||||||
assert response.status == 206
|
assert response.status == 206
|
||||||
assert "Content-Range" in response.headers
|
assert "Content-Range" in response.headers
|
||||||
assert response.headers["Content-Range"] == "bytes {}-{}/{}".format(
|
assert response.headers["Content-Range"] == f"bytes {range.start}-{range.end}/{range.total}"
|
||||||
range.start, range.end, range.total
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_raw_response(app):
|
def test_raw_response(app):
|
||||||
|
|
|
@ -20,17 +20,17 @@ def test_versioned_routes_get(app, method):
|
||||||
func = getattr(app, method)
|
func = getattr(app, method)
|
||||||
if callable(func):
|
if callable(func):
|
||||||
|
|
||||||
@func("/{}".format(method), version=1)
|
@func(f"/{method}", version=1)
|
||||||
def handler(request):
|
def handler(request):
|
||||||
return text("OK")
|
return text("OK")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print(func)
|
print(func)
|
||||||
raise Exception("Method: {} is not callable".format(method))
|
raise Exception(f"Method: {method} is not callable")
|
||||||
|
|
||||||
client_method = getattr(app.test_client, method)
|
client_method = getattr(app.test_client, method)
|
||||||
|
|
||||||
request, response = client_method("/v1/{}".format(method))
|
request, response = client_method(f"/v1/{method}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
|
||||||
|
|
||||||
|
@ -163,7 +163,7 @@ def test_route_optional_slash(app):
|
||||||
def test_route_strict_slashes_set_to_false_and_host_is_a_list(app):
|
def test_route_strict_slashes_set_to_false_and_host_is_a_list(app):
|
||||||
# Part of regression test for issue #1120
|
# Part of regression test for issue #1120
|
||||||
|
|
||||||
site1 = "127.0.0.1:{}".format(app.test_client.port)
|
site1 = f"127.0.0.1:{app.test_client.port}"
|
||||||
|
|
||||||
# before fix, this raises a RouteExists error
|
# before fix, this raises a RouteExists error
|
||||||
@app.get("/get", host=[site1, "site2.com"], strict_slashes=False)
|
@app.get("/get", host=[site1, "site2.com"], strict_slashes=False)
|
||||||
|
@ -393,7 +393,8 @@ def test_dynamic_route_uuid(app):
|
||||||
assert response.text == "OK"
|
assert response.text == "OK"
|
||||||
assert type(results[0]) is uuid.UUID
|
assert type(results[0]) is uuid.UUID
|
||||||
|
|
||||||
request, response = app.test_client.get("/quirky/{}".format(uuid.uuid4()))
|
generated_uuid = uuid.uuid4()
|
||||||
|
request, response = app.test_client.get(f"/quirky/{generated_uuid}")
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
|
||||||
request, response = app.test_client.get("/quirky/non-existing")
|
request, response = app.test_client.get("/quirky/non-existing")
|
||||||
|
@ -794,7 +795,7 @@ def test_remove_inexistent_route(app):
|
||||||
with pytest.raises(RouteDoesNotExist) as excinfo:
|
with pytest.raises(RouteDoesNotExist) as excinfo:
|
||||||
app.remove_route(uri)
|
app.remove_route(uri)
|
||||||
|
|
||||||
assert str(excinfo.value) == "Route was not registered: {}".format(uri)
|
assert str(excinfo.value) == f"Route was not registered: {uri}"
|
||||||
|
|
||||||
|
|
||||||
def test_removing_slash(app):
|
def test_removing_slash(app):
|
||||||
|
|
|
@ -22,7 +22,7 @@ skipif_no_alarm = pytest.mark.skipif(
|
||||||
|
|
||||||
def create_listener(listener_name, in_list):
|
def create_listener(listener_name, in_list):
|
||||||
async def _listener(app, loop):
|
async def _listener(app, loop):
|
||||||
print("DEBUG MESSAGE FOR PYTEST for {}".format(listener_name))
|
print(f"DEBUG MESSAGE FOR PYTEST for {listener_name}")
|
||||||
in_list.insert(0, app.name + listener_name)
|
in_list.insert(0, app.name + listener_name)
|
||||||
|
|
||||||
return _listener
|
return _listener
|
||||||
|
|
|
@ -98,7 +98,7 @@ def test_static_directory(app, file_name, base_uri, static_file_directory):
|
||||||
app.static(base_uri, static_file_directory)
|
app.static(base_uri, static_file_directory)
|
||||||
|
|
||||||
request, response = app.test_client.get(
|
request, response = app.test_client.get(
|
||||||
uri="{}/{}".format(base_uri, file_name)
|
uri=f"{base_uri}/{file_name}"
|
||||||
)
|
)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.body == get_file_content(static_file_directory, file_name)
|
assert response.body == get_file_content(static_file_directory, file_name)
|
||||||
|
@ -234,7 +234,7 @@ def test_static_content_range_invalid_unit(
|
||||||
)
|
)
|
||||||
|
|
||||||
unit = "bit"
|
unit = "bit"
|
||||||
headers = {"Range": "{}=1-0".format(unit)}
|
headers = {"Range": f"{unit}=1-0"}
|
||||||
request, response = app.test_client.get("/testing.file", headers=headers)
|
request, response = app.test_client.get("/testing.file", headers=headers)
|
||||||
|
|
||||||
assert response.status == 416
|
assert response.status == 416
|
||||||
|
@ -252,7 +252,7 @@ def test_static_content_range_invalid_start(
|
||||||
)
|
)
|
||||||
|
|
||||||
start = "start"
|
start = "start"
|
||||||
headers = {"Range": "bytes={}-0".format(start)}
|
headers = {"Range": f"bytes={start}-0"}
|
||||||
request, response = app.test_client.get("/testing.file", headers=headers)
|
request, response = app.test_client.get("/testing.file", headers=headers)
|
||||||
|
|
||||||
assert response.status == 416
|
assert response.status == 416
|
||||||
|
@ -270,7 +270,7 @@ def test_static_content_range_invalid_end(
|
||||||
)
|
)
|
||||||
|
|
||||||
end = "end"
|
end = "end"
|
||||||
headers = {"Range": "bytes=1-{}".format(end)}
|
headers = {"Range": f"bytes=1-{end}"}
|
||||||
request, response = app.test_client.get("/testing.file", headers=headers)
|
request, response = app.test_client.get("/testing.file", headers=headers)
|
||||||
|
|
||||||
assert response.status == 416
|
assert response.status == 416
|
||||||
|
@ -373,7 +373,7 @@ def test_file_not_found(app, static_file_directory):
|
||||||
def test_static_name(app, static_file_directory, static_name, file_name):
|
def test_static_name(app, static_file_directory, static_name, file_name):
|
||||||
app.static("/static", static_file_directory, name=static_name)
|
app.static("/static", static_file_directory, name=static_name)
|
||||||
|
|
||||||
request, response = app.test_client.get("/static/{}".format(file_name))
|
request, response = app.test_client.get(f"/static/{file_name}")
|
||||||
|
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
import socket
|
|
||||||
|
|
||||||
from sanic.response import json, text
|
from sanic.response import json, text
|
||||||
from sanic.testing import PORT, SanicTestClient
|
from sanic.testing import PORT, SanicTestClient
|
||||||
|
|
||||||
|
|
|
@ -20,30 +20,24 @@ URL_FOR_ARGS3 = dict(
|
||||||
arg1="v1",
|
arg1="v1",
|
||||||
_anchor="anchor",
|
_anchor="anchor",
|
||||||
_scheme="http",
|
_scheme="http",
|
||||||
_server="{}:{}".format(test_host, test_port),
|
_server=f"{test_host}:{test_port}",
|
||||||
_external=True,
|
_external=True,
|
||||||
)
|
)
|
||||||
URL_FOR_VALUE3 = "http://{}:{}/myurl?arg1=v1#anchor".format(
|
URL_FOR_VALUE3 = f"http://{test_host}:{test_port}/myurl?arg1=v1#anchor"
|
||||||
test_host, test_port
|
|
||||||
)
|
|
||||||
URL_FOR_ARGS4 = dict(
|
URL_FOR_ARGS4 = dict(
|
||||||
arg1="v1",
|
arg1="v1",
|
||||||
_anchor="anchor",
|
_anchor="anchor",
|
||||||
_external=True,
|
_external=True,
|
||||||
_server="http://{}:{}".format(test_host, test_port),
|
_server=f"http://{test_host}:{test_port}",
|
||||||
)
|
|
||||||
URL_FOR_VALUE4 = "http://{}:{}/myurl?arg1=v1#anchor".format(
|
|
||||||
test_host, test_port
|
|
||||||
)
|
)
|
||||||
|
URL_FOR_VALUE4 = f"http://{test_host}:{test_port}/myurl?arg1=v1#anchor"
|
||||||
|
|
||||||
|
|
||||||
def _generate_handlers_from_names(app, l):
|
def _generate_handlers_from_names(app, l):
|
||||||
for name in l:
|
for name in l:
|
||||||
# this is the easiest way to generate functions with dynamic names
|
# this is the easiest way to generate functions with dynamic names
|
||||||
exec(
|
exec(
|
||||||
'@app.route(name)\ndef {}(request):\n\treturn text("{}")'.format(
|
f'@app.route(name)\ndef {name}(request):\n\treturn text("{name}")'
|
||||||
name, name
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -60,7 +54,7 @@ def test_simple_url_for_getting(simple_app):
|
||||||
for letter in string.ascii_letters:
|
for letter in string.ascii_letters:
|
||||||
url = simple_app.url_for(letter)
|
url = simple_app.url_for(letter)
|
||||||
|
|
||||||
assert url == "/{}".format(letter)
|
assert url == f"/{letter}"
|
||||||
request, response = simple_app.test_client.get(url)
|
request, response = simple_app.test_client.get(url)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.text == letter
|
assert response.text == letter
|
||||||
|
@ -88,7 +82,7 @@ def test_simple_url_for_getting_with_more_params(app, args, url):
|
||||||
|
|
||||||
def test_url_for_with_server_name(app):
|
def test_url_for_with_server_name(app):
|
||||||
|
|
||||||
server_name = "{}:{}".format(test_host, test_port)
|
server_name = f"{test_host}:{test_port}"
|
||||||
app.config.update({"SERVER_NAME": server_name})
|
app.config.update({"SERVER_NAME": server_name})
|
||||||
path = "/myurl"
|
path = "/myurl"
|
||||||
|
|
||||||
|
@ -96,7 +90,7 @@ def test_url_for_with_server_name(app):
|
||||||
def passes(request):
|
def passes(request):
|
||||||
return text("this should pass")
|
return text("this should pass")
|
||||||
|
|
||||||
url = "http://{}{}".format(server_name, path)
|
url = f"http://{server_name}{path}"
|
||||||
assert url == app.url_for("passes", _server=None, _external=True)
|
assert url == app.url_for("passes", _server=None, _external=True)
|
||||||
request, response = app.test_client.get(url)
|
request, response = app.test_client.get(url)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
@ -118,7 +112,7 @@ def test_fails_url_build_if_param_not_passed(app):
|
||||||
url = "/"
|
url = "/"
|
||||||
|
|
||||||
for letter in string.ascii_letters:
|
for letter in string.ascii_letters:
|
||||||
url += "<{}>/".format(letter)
|
url += f"<{letter}>/"
|
||||||
|
|
||||||
@app.route(url)
|
@app.route(url)
|
||||||
def fail(request):
|
def fail(request):
|
||||||
|
@ -182,7 +176,7 @@ def test_passes_with_negative_int_message(app):
|
||||||
@app.route("path/<possibly_neg:int>/another-word")
|
@app.route("path/<possibly_neg:int>/another-word")
|
||||||
def good(request, possibly_neg):
|
def good(request, possibly_neg):
|
||||||
assert isinstance(possibly_neg, int)
|
assert isinstance(possibly_neg, int)
|
||||||
return text("this should pass with `{}`".format(possibly_neg))
|
return text(f"this should pass with `{possibly_neg}`")
|
||||||
|
|
||||||
u_plus_3 = app.url_for("good", possibly_neg=3)
|
u_plus_3 = app.url_for("good", possibly_neg=3)
|
||||||
assert u_plus_3 == "/path/3/another-word", u_plus_3
|
assert u_plus_3 == "/path/3/another-word", u_plus_3
|
||||||
|
@ -237,13 +231,13 @@ def test_passes_with_negative_number_message(app, number):
|
||||||
@app.route("path/<possibly_neg:number>/another-word")
|
@app.route("path/<possibly_neg:number>/another-word")
|
||||||
def good(request, possibly_neg):
|
def good(request, possibly_neg):
|
||||||
assert isinstance(possibly_neg, (int, float))
|
assert isinstance(possibly_neg, (int, float))
|
||||||
return text("this should pass with `{}`".format(possibly_neg))
|
return text(f"this should pass with `{possibly_neg}`")
|
||||||
|
|
||||||
u = app.url_for("good", possibly_neg=number)
|
u = app.url_for("good", possibly_neg=number)
|
||||||
assert u == "/path/{}/another-word".format(number), u
|
assert u == f"/path/{number}/another-word", u
|
||||||
request, response = app.test_client.get(u)
|
request, response = app.test_client.get(u)
|
||||||
# For ``number``, it has been cast to a float - so a ``3`` becomes a ``3.0``
|
# For ``number``, it has been cast to a float - so a ``3`` becomes a ``3.0``
|
||||||
assert response.text == "this should pass with `{}`".format(float(number))
|
assert response.text == f"this should pass with `{float(number)}`"
|
||||||
|
|
||||||
|
|
||||||
def test_adds_other_supplied_values_as_query_string(app):
|
def test_adds_other_supplied_values_as_query_string(app):
|
||||||
|
@ -275,7 +269,7 @@ def blueprint_app(app):
|
||||||
|
|
||||||
@first_print.route("/foo/<param>")
|
@first_print.route("/foo/<param>")
|
||||||
def foo_with_param(request, param):
|
def foo_with_param(request, param):
|
||||||
return text("foo from first : {}".format(param))
|
return text(f"foo from first : {param}")
|
||||||
|
|
||||||
@second_print.route("/foo") # noqa
|
@second_print.route("/foo") # noqa
|
||||||
def foo(request):
|
def foo(request):
|
||||||
|
@ -283,7 +277,7 @@ def blueprint_app(app):
|
||||||
|
|
||||||
@second_print.route("/foo/<param>") # noqa
|
@second_print.route("/foo/<param>") # noqa
|
||||||
def foo_with_param(request, param):
|
def foo_with_param(request, param):
|
||||||
return text("foo from second : {}".format(param))
|
return text(f"foo from second : {param}")
|
||||||
|
|
||||||
app.blueprint(first_print)
|
app.blueprint(first_print)
|
||||||
app.blueprint(second_print)
|
app.blueprint(second_print)
|
||||||
|
|
12
tests/test_url_for.py
Normal file
12
tests/test_url_for.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
def test_routes_with_host(app):
|
||||||
|
@app.route("/")
|
||||||
|
@app.route("/", name="hostindex", host="example.com")
|
||||||
|
@app.route("/path", name="hostpath", host="path.example.com")
|
||||||
|
def index(request):
|
||||||
|
pass
|
||||||
|
|
||||||
|
assert app.url_for("index") == "/"
|
||||||
|
assert app.url_for("hostindex") == "/"
|
||||||
|
assert app.url_for("hostpath") == "/path"
|
||||||
|
assert app.url_for("hostindex", _external=True) == "http://example.com/"
|
||||||
|
assert app.url_for("hostpath", _external=True) == "http://path.example.com/path"
|
|
@ -118,7 +118,7 @@ def test_static_directory(app, file_name, base_uri, static_file_directory):
|
||||||
app.static(base_uri2, static_file_directory, name="uploads")
|
app.static(base_uri2, static_file_directory, name="uploads")
|
||||||
|
|
||||||
uri = app.url_for("static", name="static", filename=file_name)
|
uri = app.url_for("static", name="static", filename=file_name)
|
||||||
assert uri == "{}/{}".format(base_uri, file_name)
|
assert uri == f"{base_uri}/{file_name}"
|
||||||
|
|
||||||
request, response = app.test_client.get(uri)
|
request, response = app.test_client.get(uri)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
@ -134,7 +134,7 @@ def test_static_directory(app, file_name, base_uri, static_file_directory):
|
||||||
assert uri2 == uri3
|
assert uri2 == uri3
|
||||||
assert uri3 == uri4
|
assert uri3 == uri4
|
||||||
|
|
||||||
assert uri5 == "{}/{}".format(base_uri2, file_name)
|
assert uri5 == f"{base_uri2}/{file_name}"
|
||||||
assert uri5 == uri6
|
assert uri5 == uri6
|
||||||
|
|
||||||
bp = Blueprint("test_bp_static", url_prefix="/bp")
|
bp = Blueprint("test_bp_static", url_prefix="/bp")
|
||||||
|
@ -157,10 +157,10 @@ def test_static_directory(app, file_name, base_uri, static_file_directory):
|
||||||
"static", name="test_bp_static.uploads", filename="/" + file_name
|
"static", name="test_bp_static.uploads", filename="/" + file_name
|
||||||
)
|
)
|
||||||
|
|
||||||
assert uri == "/bp{}/{}".format(base_uri, file_name)
|
assert uri == f"/bp{base_uri}/{file_name}"
|
||||||
assert uri == uri2
|
assert uri == uri2
|
||||||
|
|
||||||
assert uri4 == "/bp{}/{}".format(base_uri2, file_name)
|
assert uri4 == f"/bp{base_uri2}/{file_name}"
|
||||||
assert uri4 == uri5
|
assert uri4 == uri5
|
||||||
|
|
||||||
request, response = app.test_client.get(uri)
|
request, response = app.test_client.get(uri)
|
||||||
|
|
|
@ -147,8 +147,8 @@ def test_with_custom_class_methods(app):
|
||||||
def get(self, request):
|
def get(self, request):
|
||||||
self._iternal_method()
|
self._iternal_method()
|
||||||
return text(
|
return text(
|
||||||
"I am get method and global var "
|
f"I am get method and global var "
|
||||||
"is {}".format(self.global_var)
|
f"is {self.global_var}"
|
||||||
)
|
)
|
||||||
|
|
||||||
app.add_route(DummyView.as_view(), "/")
|
app.add_route(DummyView.as_view(), "/")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user