sanic/tests/test_response.py

693 lines
22 KiB
Python
Raw Normal View History

2017-02-21 16:05:06 +00:00
import asyncio
import inspect
import os
2018-12-13 17:50:50 +00:00
from collections import namedtuple
from logging import ERROR, LogRecord
from mimetypes import guess_type
from random import choice
from typing import Callable, List
from urllib.parse import unquote
2017-02-21 16:05:06 +00:00
import pytest
from aiofiles import os as async_os
from pytest import LogCaptureFixture
2016-12-25 02:47:15 +00:00
from sanic import Request, Sanic
2018-12-30 11:18:06 +00:00
from sanic.response import (
HTTPResponse,
empty,
2018-12-30 11:18:06 +00:00
file,
file_stream,
json,
raw,
stream,
More robust response datatype handling (#1674) * HTTP1 header formatting moved to headers.format_headers and rewritten. - New implementation is one line of code and twice faster than the old one. - Whole header block encoded to UTF-8 in one pass. - No longer supports custom encode method on header values. - Cookie objects now have __str__ in addition to encode, to work with this. * Linter * format_http1_response * Replace encode_body with faster implementation based on f-string. Benchmarks: def encode_body(data): try: # Try to encode it regularly return data.encode() except AttributeError: # Convert it to a str if you can't return str(data).encode() def encode_body2(data): return f"{data}".encode() def encode_body3(data): return str(data).encode() data_str, data_int = "foo", 123 %timeit encode_body(data_int) 928 ns ± 2.96 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body2(data_int) 280 ns ± 2.09 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body3(data_int) 387 ns ± 1.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body(data_str) 202 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body2(data_str) 197 ns ± 0.507 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each) %timeit encode_body3(data_str) 313 ns ± 1.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) * Wtf linter * Content-type fixes. * Body encoding sanitation, first pass. - body/data type autodetection fixed. - do not repr(body).encode() bytes-ish values. - support __html__ and _repr_html_ in sanic.response.html(). * <any type>-to-str response autoconversion limited to sanic.response.text() only. * Workaround MyPy issue. * Add an empty line to make isort happy. * Add html test for __html__ and _repr_html_. * Remove StreamingHTTPResponse.get_headers helper function. * Add back HTTPResponse Keep-Alive removed by earlier merge or something. * Revert "Remove StreamingHTTPResponse.get_headers helper function." Tests depend on this otherwise useless function. This reverts commit 9651e6ae017b61bed6dd88af6631cdd6b01eb347. * Add deprecation warnings; instead of assert for wrong HTTP version, and for non-string response.text. * Add back missing import. * Avoid duplicate response header tweaking code. * Linter errors
2020-01-20 16:34:32 +00:00
text,
2018-12-30 11:18:06 +00:00
)
2016-12-25 02:47:15 +00:00
2018-12-30 11:18:06 +00:00
JSON_DATA = {"ok": True}
@pytest.mark.filterwarnings("ignore:Types other than str will be")
2018-08-26 15:43:14 +01:00
def test_response_body_not_a_string(app):
2016-12-25 02:47:15 +00:00
"""Test when a response body sent from the application is not a string"""
random_num = choice(range(1000))
2018-12-30 11:18:06 +00:00
@app.route("/hello")
async def hello_route(request: Request):
More robust response datatype handling (#1674) * HTTP1 header formatting moved to headers.format_headers and rewritten. - New implementation is one line of code and twice faster than the old one. - Whole header block encoded to UTF-8 in one pass. - No longer supports custom encode method on header values. - Cookie objects now have __str__ in addition to encode, to work with this. * Linter * format_http1_response * Replace encode_body with faster implementation based on f-string. Benchmarks: def encode_body(data): try: # Try to encode it regularly return data.encode() except AttributeError: # Convert it to a str if you can't return str(data).encode() def encode_body2(data): return f"{data}".encode() def encode_body3(data): return str(data).encode() data_str, data_int = "foo", 123 %timeit encode_body(data_int) 928 ns ± 2.96 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body2(data_int) 280 ns ± 2.09 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body3(data_int) 387 ns ± 1.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body(data_str) 202 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) %timeit encode_body2(data_str) 197 ns ± 0.507 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each) %timeit encode_body3(data_str) 313 ns ± 1.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) * Wtf linter * Content-type fixes. * Body encoding sanitation, first pass. - body/data type autodetection fixed. - do not repr(body).encode() bytes-ish values. - support __html__ and _repr_html_ in sanic.response.html(). * <any type>-to-str response autoconversion limited to sanic.response.text() only. * Workaround MyPy issue. * Add an empty line to make isort happy. * Add html test for __html__ and _repr_html_. * Remove StreamingHTTPResponse.get_headers helper function. * Add back HTTPResponse Keep-Alive removed by earlier merge or something. * Revert "Remove StreamingHTTPResponse.get_headers helper function." Tests depend on this otherwise useless function. This reverts commit 9651e6ae017b61bed6dd88af6631cdd6b01eb347. * Add deprecation warnings; instead of assert for wrong HTTP version, and for non-string response.text. * Add back missing import. * Avoid duplicate response header tweaking code. * Linter errors
2020-01-20 16:34:32 +00:00
return text(random_num)
2016-12-25 02:47:15 +00:00
2018-12-30 11:18:06 +00:00
request, response = app.test_client.get("/hello")
assert response.status == 500
assert b"Internal Server Error" in response.body
2017-02-21 16:05:06 +00:00
async def sample_streaming_fn(response):
2018-12-30 11:18:06 +00:00
await response.write("foo,")
await asyncio.sleep(0.001)
await response.write("bar")
2017-02-21 16:05:06 +00:00
2021-02-08 10:18:29 +00:00
def test_method_not_allowed():
app = Sanic("app")
2018-12-30 11:18:06 +00:00
@app.get("/")
async def test_get(request: Request):
2018-12-30 11:18:06 +00:00
return response.json({"hello": "world"})
2018-12-30 11:18:06 +00:00
request, response = app.test_client.head("/")
2021-02-08 10:18:29 +00:00
assert set(response.headers["Allow"].split(", ")) == {
"GET",
}
2018-12-30 11:18:06 +00:00
request, response = app.test_client.post("/")
assert set(response.headers["Allow"].split(", ")) == {
"GET",
}
2021-02-08 10:18:29 +00:00
app.router.reset()
2018-12-30 11:18:06 +00:00
@app.post("/")
async def test_post(request: Request):
2018-12-30 11:18:06 +00:00
return response.json({"hello": "world"})
2018-12-30 11:18:06 +00:00
request, response = app.test_client.head("/")
assert response.status == 405
2021-02-08 10:18:29 +00:00
assert set(response.headers["Allow"].split(", ")) == {
"GET",
"POST",
}
2018-12-30 11:18:06 +00:00
assert response.headers["Content-Length"] == "0"
2018-12-30 11:18:06 +00:00
request, response = app.test_client.patch("/")
assert response.status == 405
2021-02-08 10:18:29 +00:00
assert set(response.headers["Allow"].split(", ")) == {
"GET",
"POST",
}
2018-12-30 11:18:06 +00:00
assert response.headers["Content-Length"] == "0"
2017-02-21 16:05:06 +00:00
2018-08-26 15:43:14 +01:00
def test_response_header(app):
2018-12-30 11:18:06 +00:00
@app.get("/")
async def test(request: Request):
2018-12-30 11:18:06 +00:00
return json({"ok": True}, headers={"CONTENT-TYPE": "application/json"})
2018-12-30 11:18:06 +00:00
request, response = app.test_client.get("/")
assert dict(response.headers) == {
"connection": "keep-alive",
"content-length": "11",
"content-type": "application/json",
}
def test_response_content_length(app):
@app.get("/response_with_space")
async def response_with_space(request: Request):
2018-12-30 11:18:06 +00:00
return json(
{"message": "Data", "details": "Some Details"},
headers={"CONTENT-TYPE": "application/json"},
)
@app.get("/response_without_space")
async def response_without_space(request: Request):
2018-12-30 11:18:06 +00:00
return json(
{"message": "Data", "details": "Some Details"},
headers={"CONTENT-TYPE": "application/json"},
)
_, response = app.test_client.get("/response_with_space")
2018-12-30 11:18:06 +00:00
content_length_for_response_with_space = response.headers.get(
"Content-Length"
)
_, response = app.test_client.get("/response_without_space")
2018-12-30 11:18:06 +00:00
content_length_for_response_without_space = response.headers.get(
"Content-Length"
)
2018-12-30 11:18:06 +00:00
assert (
content_length_for_response_with_space
== content_length_for_response_without_space
)
2018-12-30 11:18:06 +00:00
assert content_length_for_response_with_space == "43"
def test_response_content_length_with_different_data_types(app):
@app.get("/")
async def get_data_with_different_types(request: Request):
# Indentation issues in the Response is intentional. Please do not fix
2018-12-30 11:18:06 +00:00
return json(
{"bool": True, "none": None, "string": "string", "number": -1},
headers={"CONTENT-TYPE": "application/json"},
)
_, response = app.test_client.get("/")
2018-12-30 11:18:06 +00:00
assert response.headers.get("Content-Length") == "55"
@pytest.fixture
2018-08-26 15:43:14 +01:00
def json_app(app):
@app.route("/")
async def test(request: Request):
return json(JSON_DATA)
2018-02-01 19:00:32 +00:00
@app.get("/no-content")
async def no_content_handler(request: Request):
2018-02-01 19:00:32 +00:00
return json(JSON_DATA, status=204)
@app.get("/no-content/unmodified")
async def no_content_unmodified_handler(request: Request):
return json(None, status=304)
@app.get("/unmodified")
async def unmodified_handler(request: Request):
return json(JSON_DATA, status=304)
@app.delete("/")
async def delete_handler(request: Request):
return json(None, status=204)
return app
def test_json_response(json_app):
from sanic.response import json_dumps
2018-12-30 11:18:06 +00:00
request, response = json_app.test_client.get("/")
assert response.status == 200
assert response.text == json_dumps(JSON_DATA)
assert response.json == JSON_DATA
def test_no_content(json_app):
2018-12-30 11:18:06 +00:00
request, response = json_app.test_client.get("/no-content")
2018-02-01 19:00:32 +00:00
assert response.status == 204
2018-12-30 11:18:06 +00:00
assert response.text == ""
assert "Content-Length" not in response.headers
2018-02-01 19:00:32 +00:00
2018-12-30 11:18:06 +00:00
request, response = json_app.test_client.get("/no-content/unmodified")
assert response.status == 304
2018-12-30 11:18:06 +00:00
assert response.text == ""
assert "Content-Length" not in response.headers
assert "Content-Type" not in response.headers
2018-12-30 11:18:06 +00:00
request, response = json_app.test_client.get("/unmodified")
assert response.status == 304
2018-12-30 11:18:06 +00:00
assert response.text == ""
assert "Content-Length" not in response.headers
assert "Content-Type" not in response.headers
2018-12-30 11:18:06 +00:00
request, response = json_app.test_client.delete("/")
assert response.status == 204
2018-12-30 11:18:06 +00:00
assert response.text == ""
assert "Content-Length" not in response.headers
2017-02-21 16:05:06 +00:00
@pytest.fixture
2018-08-26 15:43:14 +01:00
def streaming_app(app):
2017-02-21 16:05:06 +00:00
@app.route("/")
async def test(request: Request):
2019-04-20 20:27:10 +01:00
return stream(
sample_streaming_fn,
content_type="text/csv",
)
return app
@pytest.fixture
def non_chunked_streaming_app(app):
@app.route("/")
async def test(request: Request):
2019-04-20 20:27:10 +01:00
return stream(
sample_streaming_fn,
headers={"Content-Length": "7"},
content_type="text/csv",
)
2017-02-21 16:05:06 +00:00
return app
2019-04-20 20:27:10 +01:00
def test_chunked_streaming_adds_correct_headers(streaming_app):
2018-12-30 11:18:06 +00:00
request, response = streaming_app.test_client.get("/")
assert response.headers["Transfer-Encoding"] == "chunked"
assert response.headers["Content-Type"] == "text/csv"
2019-04-20 20:27:10 +01:00
# Content-Length is not allowed by HTTP/1.1 specification
# when "Transfer-Encoding: chunked" is used
assert "Content-Length" not in response.headers
2017-02-21 16:05:06 +00:00
2019-04-20 20:27:10 +01:00
def test_chunked_streaming_returns_correct_content(streaming_app):
2018-12-30 11:18:06 +00:00
request, response = streaming_app.test_client.get("/")
assert response.text == "foo,bar"
2017-02-21 16:05:06 +00:00
@pytest.mark.asyncio
async def test_chunked_streaming_returns_correct_content_asgi(streaming_app):
request, response = await streaming_app.asgi_client.get("/")
2021-02-08 10:18:29 +00:00
assert response.body == b"foo,bar"
2019-05-21 23:42:19 +01:00
def test_non_chunked_streaming_adds_correct_headers(non_chunked_streaming_app):
request, response = non_chunked_streaming_app.test_client.get("/")
Streaming Server (#1876) * Streaming request by async for. * Make all requests streaming and preload body for non-streaming handlers. * Cleanup of code and avoid mixing streaming responses. * Async http protocol loop. * Change of test: don't require early bad request error but only after CRLF-CRLF. * Add back streaming requests. * Rewritten request body parser. * Misc. cleanup, down to 4 failing tests. * All tests OK. * Entirely remove request body queue. * Let black f*ckup the layout * Better testing error messages on protocol errors. * Remove StreamBuffer tests because the type is about to be removed. * Remove tests using the deprecated get_headers function that can no longer be supported. Chunked mode is now autodetected, so do not put content-length header if chunked mode is preferred. * Major refactoring of HTTP protocol handling (new module http.py added), all requests made streaming. A few compatibility issues and a lot of cleanup to be done remain, 16 tests failing. * Terminate check_timeouts once connection_task finishes. * Code cleanup, 14 tests failing. * Much cleanup, 12 failing... * Even more cleanup and error checking, 8 failing tests. * Remove keep-alive header from responses. First of all, it should say timeout=<value> which wasn't the case with existing implementation, and secondly none of the other web servers I tried include this header. * Everything but CustomServer OK. * Linter * Disable custom protocol test * Remove unnecessary variables, optimise performance. * A test was missing that body_init/body_push/body_finish are never called. Rewritten using receive_body and case switching to make it fail if bypassed. * Minor fixes. * Remove unused code. * Py 3.8 check for deprecated loop argument. * Fix a middleware cancellation handling test with py38. * Linter 'n fixes * Typing * Stricter handling of request header size * More specific error messages on Payload Too Large. * Init http.response = None * Messages further tuned. * Always try to consume request body, plus minor cleanup. * Add a missing check in case of close_if_idle on a dead connection. * Avoid error messages on PayloadTooLarge. * Add test for new API. * json takes str, not bytes * Default to no maximum request size for streaming handlers. * Fix chunked mode crash. * Header values should be strictly ASCII but both UTF-8 and Latin-1 exist. Use UTF-8B to cope with all. * Refactoring and cleanup. * Unify response header processing of ASGI and asyncio modes. * Avoid special handling of StreamingHTTPResponse. * 35 % speedup in HTTP/1.1 response formatting (not so much overall effect). * Duplicate set-cookie headers were being produced. * Cleanup processed_headers some more. * Linting * Import ordering * Response middleware ran by async request.respond(). * Need to check if transport is closing to avoid getting stuck in sending loops after peer has disconnected. * Middleware and error handling refactoring. * Linter * Fix tracking of HTTP stage when writing to transport fails. * Add clarifying comment * Add a check for request body functions and a test for NotImplementedError. * Linter and typing * These must be tuples + hack mypy warnings away. * New streaming test and minor fixes. * Constant receive buffer size. * 256 KiB send and receive buffers. * Revert "256 KiB send and receive buffers." This reverts commit abc1e3edb21a5e6925fa4c856657559608a8d65b. * app.handle_exception already sends the response. * Improved handling of errors during request. * An odd hack to avoid an httpx limitation that causes test failures. * Limit request header size to 8 KiB at most. * Remove unnecessary use of format string. * Cleanup tests * Remove artifact * Fix type checking * Mark test for skipping * Cleanup some edge cases * Add ignore_body flag to safe methods * Add unit tests for timeout logic * Add unit tests for timeout logic * Fix Mock usage in timeout test * Change logging test to only logger in handler * Windows py3.8 logging issue with current testing client * Add test_header_size_exceeded * Resolve merge conflicts * Add request middleware to hard exception handling * Add request middleware to hard exception handling * Request middleware on exception handlers * Linting * Cleanup deprecations Co-authored-by: L. Kärkkäinen <tronic@users.noreply.github.com> Co-authored-by: Adam Hopkins <admhpkns@gmail.com>
2021-01-10 22:45:36 +00:00
2019-04-20 20:27:10 +01:00
assert "Transfer-Encoding" not in response.headers
assert response.headers["Content-Type"] == "text/csv"
assert response.headers["Content-Length"] == "7"
@pytest.mark.asyncio
async def test_non_chunked_streaming_adds_correct_headers_asgi(
non_chunked_streaming_app,
):
request, response = await non_chunked_streaming_app.asgi_client.get("/")
assert "Transfer-Encoding" not in response.headers
assert response.headers["Content-Type"] == "text/csv"
assert response.headers["Content-Length"] == "7"
2019-04-20 20:27:10 +01:00
def test_non_chunked_streaming_returns_correct_content(
non_chunked_streaming_app,
2019-04-20 20:27:10 +01:00
):
request, response = non_chunked_streaming_app.test_client.get("/")
assert response.text == "foo,bar"
def test_stream_response_with_cookies(app):
@app.route("/")
async def test(request: Request):
2018-12-30 11:18:06 +00:00
response = stream(sample_streaming_fn, content_type="text/csv")
response.cookies["test"] = "modified"
response.cookies["test"] = "pass"
return response
2018-12-30 11:18:06 +00:00
request, response = app.test_client.get("/")
assert response.cookies["test"] == "pass"
def test_stream_response_without_cookies(app):
@app.route("/")
async def test(request: Request):
2018-12-30 11:18:06 +00:00
return stream(sample_streaming_fn, content_type="text/csv")
2018-12-30 11:18:06 +00:00
request, response = app.test_client.get("/")
assert response.cookies == {}
@pytest.fixture
def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
2018-12-30 11:18:06 +00:00
static_directory = os.path.join(current_directory, "static")
return static_directory
def get_file_content(static_file_directory, file_name):
"""The content of the static file to check"""
2018-12-30 11:18:06 +00:00
with open(os.path.join(static_file_directory, file_name), "rb") as file:
return file.read()
2018-02-02 13:05:57 +00:00
2018-12-30 11:18:06 +00:00
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize("status", [200, 401])
def test_file_response(app: Sanic, file_name, static_file_directory, status):
2018-12-30 11:18:06 +00:00
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
2018-12-30 11:18:06 +00:00
return file(
file_path,
status=status,
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.get(f"/files/{file_name}")
assert response.status == status
assert response.body == get_file_content(static_file_directory, file_name)
2018-12-30 11:18:06 +00:00
assert "Content-Disposition" not in response.headers
2018-02-02 13:05:57 +00:00
2018-10-22 21:25:38 +01:00
@pytest.mark.parametrize(
2018-12-30 11:18:06 +00:00
"source,dest",
2018-10-22 21:25:38 +01:00
[
2018-12-30 11:18:06 +00:00
("test.file", "my_file.txt"),
("decode me.txt", "readme.md"),
("python.png", "logo.png"),
],
2018-10-22 21:25:38 +01:00
)
2018-12-30 11:18:06 +00:00
def test_file_response_custom_filename(
app: Sanic, source, dest, static_file_directory
2018-12-30 11:18:06 +00:00
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file(file_path, filename=dest)
request, response = app.test_client.get(f"/files/{source}")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert (
response.headers["Content-Disposition"]
== f'attachment; filename="{dest}"'
)
2018-02-02 13:05:57 +00:00
2018-12-30 11:18:06 +00:00
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_file_head_response(app: Sanic, file_name, static_file_directory):
2018-12-30 11:18:06 +00:00
@app.route("/files/<filename>", methods=["GET", "HEAD"])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
stats = await async_os.stat(file_path)
headers = {}
2018-12-30 11:18:06 +00:00
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method == "HEAD":
return HTTPResponse(
headers=headers,
2018-12-30 11:18:06 +00:00
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
2018-12-30 11:18:06 +00:00
return file(
file_path,
headers=headers,
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.head(f"/files/{file_name}")
assert response.status == 200
2018-12-30 11:18:06 +00:00
assert "Accept-Ranges" in response.headers
assert "Content-Length" in response.headers
assert int(response.headers["Content-Length"]) == len(
get_file_content(static_file_directory, file_name)
)
2018-02-02 13:05:57 +00:00
2018-12-30 11:18:06 +00:00
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_file_stream_response(app: Sanic, file_name, static_file_directory):
2018-12-30 11:18:06 +00:00
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
2018-12-30 11:18:06 +00:00
return file_stream(
file_path,
chunk_size=32,
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.get(f"/files/{file_name}")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
2018-12-30 11:18:06 +00:00
assert "Content-Disposition" not in response.headers
2018-02-02 13:05:57 +00:00
2018-10-22 21:25:38 +01:00
@pytest.mark.parametrize(
2018-12-30 11:18:06 +00:00
"source,dest",
2018-10-22 21:25:38 +01:00
[
2018-12-30 11:18:06 +00:00
("test.file", "my_file.txt"),
("decode me.txt", "readme.md"),
("python.png", "logo.png"),
],
2018-10-22 21:25:38 +01:00
)
2018-12-30 11:18:06 +00:00
def test_file_stream_response_custom_filename(
app: Sanic, source, dest, static_file_directory
2018-12-30 11:18:06 +00:00
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(file_path, chunk_size=32, filename=dest)
request, response = app.test_client.get(f"/files/{source}")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert (
response.headers["Content-Disposition"]
== f'attachment; filename="{dest}"'
)
2018-02-02 13:05:57 +00:00
2018-12-30 11:18:06 +00:00
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_file_stream_head_response(
app: Sanic, file_name, static_file_directory
):
2018-12-30 11:18:06 +00:00
@app.route("/files/<filename>", methods=["GET", "HEAD"])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
headers = {}
2018-12-30 11:18:06 +00:00
headers["Accept-Ranges"] = "bytes"
if request.method == "HEAD":
# Return a normal HTTPResponse, not a
# StreamingHTTPResponse for a HEAD request
stats = await async_os.stat(file_path)
2018-12-30 11:18:06 +00:00
headers["Content-Length"] = str(stats.st_size)
return HTTPResponse(
headers=headers,
2018-12-30 11:18:06 +00:00
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
2018-10-22 21:25:38 +01:00
return file_stream(
2018-12-30 11:18:06 +00:00
file_path,
chunk_size=32,
headers=headers,
mime_type=guess_type(file_path)[0] or "text/plain",
2018-10-22 21:25:38 +01:00
)
request, response = app.test_client.head(f"/files/{file_name}")
assert response.status == 200
# A HEAD request should never be streamed/chunked.
2018-12-30 11:18:06 +00:00
if "Transfer-Encoding" in response.headers:
assert response.headers["Transfer-Encoding"] != "chunked"
assert "Accept-Ranges" in response.headers
# A HEAD request should get the Content-Length too
2018-12-30 11:18:06 +00:00
assert "Content-Length" in response.headers
assert int(response.headers["Content-Length"]) == len(
get_file_content(static_file_directory, file_name)
)
2018-12-13 17:50:50 +00:00
2018-12-30 11:18:06 +00:00
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize(
"size,start,end", [(1024, 0, 1024), (4096, 1024, 8192)]
)
def test_file_stream_response_range(
app: Sanic, file_name, static_file_directory, size, start, end
2018-12-30 11:18:06 +00:00
):
2018-12-13 17:50:50 +00:00
2018-12-30 11:18:06 +00:00
Range = namedtuple("Range", ["size", "start", "end", "total"])
2018-12-13 17:50:50 +00:00
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)
2018-12-30 11:18:06 +00:00
@app.route("/files/<filename>", methods=["GET"])
2018-12-13 17:50:50 +00:00
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(
file_path,
chunk_size=32,
2018-12-30 11:18:06 +00:00
mime_type=guess_type(file_path)[0] or "text/plain",
_range=range,
)
2018-12-13 17:50:50 +00:00
request, response = app.test_client.get(f"/files/{file_name}")
2018-12-13 17:50:50 +00:00
assert response.status == 206
2018-12-30 11:18:06 +00:00
assert "Content-Range" in response.headers
assert (
response.headers["Content-Range"]
== f"bytes {range.start}-{range.end}/{range.total}"
)
2018-12-13 17:50:50 +00:00
2018-12-13 17:50:50 +00:00
def test_raw_response(app):
2018-12-30 11:18:06 +00:00
@app.get("/test")
def handler(request: Request):
2018-12-30 11:18:06 +00:00
return raw(b"raw_response")
2018-12-13 17:50:50 +00:00
2018-12-30 11:18:06 +00:00
request, response = app.test_client.get("/test")
assert response.content_type == "application/octet-stream"
assert response.body == b"raw_response"
def test_empty_response(app):
@app.get("/test")
def handler(request: Request):
return empty()
request, response = app.test_client.get("/test")
assert response.content_type is None
assert response.body == b""
def test_direct_response_stream(app: Sanic):
@app.route("/")
async def test(request: Request):
response = await request.respond(content_type="text/csv")
await response.send("foo,")
await response.send("bar")
await response.eof()
_, response = app.test_client.get("/")
assert response.text == "foo,bar"
assert response.headers["Transfer-Encoding"] == "chunked"
assert response.headers["Content-Type"] == "text/csv"
assert "Content-Length" not in response.headers
def test_two_respond_calls(app: Sanic):
@app.route("/")
async def handler(request: Request):
response = await request.respond()
await response.send("foo,")
await response.send("bar")
await response.eof()
def test_multiple_responses(
app: Sanic,
caplog: LogCaptureFixture,
message_in_records: Callable[[List[LogRecord], str], bool],
):
@app.route("/1")
async def handler(request: Request):
response = await request.respond()
await response.send("foo")
response = await request.respond()
@app.route("/2")
async def handler(request: Request):
response = await request.respond()
response = await request.respond()
await response.send("foo")
@app.get("/3")
async def handler(request: Request):
response = await request.respond()
await response.send("foo,")
response = await request.respond()
await response.send("bar")
@app.get("/4")
async def handler(request: Request):
response = await request.respond(headers={"one": "one"})
return json({"foo": "bar"}, headers={"one": "two"})
@app.get("/5")
async def handler(request: Request):
response = await request.respond(headers={"one": "one"})
await response.send("foo")
return json({"foo": "bar"}, headers={"one": "two"})
@app.get("/6")
async def handler(request: Request):
response = await request.respond(headers={"one": "one"})
await response.send("foo, ")
json_response = json({"foo": "bar"}, headers={"one": "two"})
await response.send("bar")
return json_response
error_msg0 = "Second respond call is not allowed."
error_msg1 = (
"The error response will not be sent to the client for the following "
'exception:"Second respond call is not allowed.". A previous '
"response has at least partially been sent."
)
error_msg2 = (
"The response object returned by the route handler "
"will not be sent to client. The request has already "
"been responded to."
)
error_msg3 = (
"Response stream was ended, no more "
"response data is allowed to be sent."
)
with caplog.at_level(ERROR):
_, response = app.test_client.get("/1")
assert response.status == 200
assert message_in_records(caplog.records, error_msg0)
assert message_in_records(caplog.records, error_msg1)
with caplog.at_level(ERROR):
_, response = app.test_client.get("/2")
assert response.status == 500
assert "500 — Internal Server Error" in response.text
with caplog.at_level(ERROR):
_, response = app.test_client.get("/3")
assert response.status == 200
assert "foo," in response.text
assert message_in_records(caplog.records, error_msg0)
assert message_in_records(caplog.records, error_msg1)
with caplog.at_level(ERROR):
_, response = app.test_client.get("/4")
print(response.json)
assert response.status == 200
assert "foo" not in response.text
assert "one" in response.headers
assert response.headers["one"] == "one"
print(response.headers)
assert message_in_records(caplog.records, error_msg2)
with caplog.at_level(ERROR):
_, response = app.test_client.get("/5")
assert response.status == 200
assert "foo" in response.text
assert "one" in response.headers
assert response.headers["one"] == "one"
assert message_in_records(caplog.records, error_msg2)
with caplog.at_level(ERROR):
_, response = app.test_client.get("/6")
assert "foo, bar" in response.text
assert "one" in response.headers
assert response.headers["one"] == "one"
assert message_in_records(caplog.records, error_msg2)
def send_response_after_eof_should_fail(
app: Sanic,
caplog: LogCaptureFixture,
message_in_records: Callable[[List[LogRecord], str], bool],
):
@app.get("/")
async def handler(request: Request):
response = await request.respond()
await response.send("foo, ")
await response.eof()
await response.send("bar")
error_msg1 = (
"The error response will not be sent to the client for the following "
'exception:"Second respond call is not allowed.". A previous '
"response has at least partially been sent."
)
error_msg2 = (
"Response stream was ended, no more "
"response data is allowed to be sent."
)
with caplog.at_level(ERROR):
_, response = app.test_client.get("/")
assert "foo, " in response.text
assert message_in_records(caplog.records, error_msg1)
assert message_in_records(caplog.records, error_msg2)