This commit is contained in:
Adam Hopkins
2020-10-25 10:45:22 +02:00
parent c09129ec63
commit eb3d0a3f87
7 changed files with 106 additions and 39 deletions

View File

@@ -230,8 +230,8 @@ async def handler3(request):
def test_keep_alive_timeout_reuse():
"""If the server keep-alive timeout and client keep-alive timeout are
both longer than the delay, the client _and_ server will successfully
reuse the existing connection."""
both longer than the delay, the client _and_ server will successfully
reuse the existing connection."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

View File

@@ -1,6 +1,7 @@
import asyncio
import inspect
import os
from collections import namedtuple
from mimetypes import guess_type
from random import choice
@@ -8,6 +9,7 @@ from unittest.mock import MagicMock
from urllib.parse import unquote
import pytest
from aiofiles import os as async_os
from sanic.response import (
@@ -23,6 +25,7 @@ from sanic.response import (
from sanic.server import HttpProtocol
from sanic.testing import HOST, PORT
JSON_DATA = {"ok": True}
@@ -100,10 +103,14 @@ def test_response_content_length(app):
)
_, response = app.test_client.get("/response_with_space")
content_length_for_response_with_space = response.headers.get("Content-Length")
content_length_for_response_with_space = response.headers.get(
"Content-Length"
)
_, response = app.test_client.get("/response_without_space")
content_length_for_response_without_space = response.headers.get("Content-Length")
content_length_for_response_without_space = response.headers.get(
"Content-Length"
)
assert (
content_length_for_response_with_space
@@ -248,7 +255,9 @@ async def test_non_chunked_streaming_adds_correct_headers_asgi(
assert response.headers["Content-Length"] == "7"
def test_non_chunked_streaming_returns_correct_content(non_chunked_streaming_app,):
def test_non_chunked_streaming_returns_correct_content(
non_chunked_streaming_app,
):
request, response = non_chunked_streaming_app.test_client.get("/")
assert response.text == "foo,bar"
@@ -261,7 +270,9 @@ def test_stream_response_status_returns_correct_headers(status):
@pytest.mark.parametrize("keep_alive_timeout", [10, 20, 30])
def test_stream_response_keep_alive_returns_correct_headers(keep_alive_timeout,):
def test_stream_response_keep_alive_returns_correct_headers(
keep_alive_timeout,
):
response = StreamingHTTPResponse(sample_streaming_fn)
headers = response.get_headers(
keep_alive=True, keep_alive_timeout=keep_alive_timeout
@@ -345,9 +356,13 @@ def test_stream_response_writes_correct_content_to_transport_when_not_chunked(
@streaming_app.listener("after_server_start")
async def run_stream(app, loop):
await response.stream(version="1.0")
assert response.protocol.transport.write.call_args_list[1][0][0] == (b"foo,")
assert response.protocol.transport.write.call_args_list[1][0][0] == (
b"foo,"
)
assert response.protocol.transport.write.call_args_list[2][0][0] == (b"bar")
assert response.protocol.transport.write.call_args_list[2][0][0] == (
b"bar"
)
assert len(response.protocol.transport.write.call_args_list) == 3
@@ -392,7 +407,9 @@ def get_file_content(static_file_directory, file_name):
return file.read()
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt", "python.png"])
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize("status", [200, 401])
def test_file_response(app, file_name, static_file_directory, status):
@app.route("/files/<filename>", methods=["GET"])
@@ -419,7 +436,9 @@ def test_file_response(app, file_name, static_file_directory, status):
("python.png", "logo.png"),
],
)
def test_file_response_custom_filename(app, source, dest, static_file_directory):
def test_file_response_custom_filename(
app, source, dest, static_file_directory
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -446,7 +465,8 @@ def test_file_head_response(app, file_name, static_file_directory):
headers["Content-Length"] = str(stats.st_size)
if request.method == "HEAD":
return HTTPResponse(
headers=headers, content_type=guess_type(file_path)[0] or "text/plain",
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file(
@@ -464,7 +484,9 @@ def test_file_head_response(app, file_name, static_file_directory):
)
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt", "python.png"])
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_file_stream_response(app, file_name, static_file_directory):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
@@ -490,7 +512,9 @@ def test_file_stream_response(app, file_name, static_file_directory):
("python.png", "logo.png"),
],
)
def test_file_stream_response_custom_filename(app, source, dest, static_file_directory):
def test_file_stream_response_custom_filename(
app, source, dest, static_file_directory
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -519,7 +543,8 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
stats = await async_os.stat(file_path)
headers["Content-Length"] = str(stats.st_size)
return HTTPResponse(
headers=headers, content_type=guess_type(file_path)[0] or "text/plain",
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file_stream(
@@ -542,8 +567,12 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
)
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt", "python.png"])
@pytest.mark.parametrize("size,start,end", [(1024, 0, 1024), (4096, 1024, 8192)])
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize(
"size,start,end", [(1024, 0, 1024), (4096, 1024, 8192)]
)
def test_file_stream_response_range(
app, file_name, static_file_directory, size, start, end
):