Resolve headers as body in ASGI mode

This commit is contained in:
Adam Hopkins
2020-10-25 10:40:08 +02:00
parent 2a44a27236
commit c09129ec63
3 changed files with 58 additions and 135 deletions

View File

@@ -1,7 +1,6 @@
import asyncio
import inspect
import os
from collections import namedtuple
from mimetypes import guess_type
from random import choice
@@ -9,7 +8,6 @@ from unittest.mock import MagicMock
from urllib.parse import unquote
import pytest
from aiofiles import os as async_os
from sanic.response import (
@@ -25,7 +23,6 @@ from sanic.response import (
from sanic.server import HttpProtocol
from sanic.testing import HOST, PORT
JSON_DATA = {"ok": True}
@@ -103,14 +100,10 @@ def test_response_content_length(app):
)
_, response = app.test_client.get("/response_with_space")
content_length_for_response_with_space = response.headers.get(
"Content-Length"
)
content_length_for_response_with_space = response.headers.get("Content-Length")
_, response = app.test_client.get("/response_without_space")
content_length_for_response_without_space = response.headers.get(
"Content-Length"
)
content_length_for_response_without_space = response.headers.get("Content-Length")
assert (
content_length_for_response_with_space
@@ -232,6 +225,12 @@ def test_chunked_streaming_returns_correct_content(streaming_app):
assert response.text == "foo,bar"
@pytest.mark.asyncio
async def test_chunked_streaming_returns_correct_content_asgi(streaming_app):
request, response = await streaming_app.asgi_client.get("/")
assert response.text == "4\r\nfoo,\r\n3\r\nbar\r\n0\r\n\r\n"
def test_non_chunked_streaming_adds_correct_headers(non_chunked_streaming_app):
request, response = non_chunked_streaming_app.test_client.get("/")
assert "Transfer-Encoding" not in response.headers
@@ -239,9 +238,17 @@ def test_non_chunked_streaming_adds_correct_headers(non_chunked_streaming_app):
assert response.headers["Content-Length"] == "7"
def test_non_chunked_streaming_returns_correct_content(
@pytest.mark.asyncio
async def test_non_chunked_streaming_adds_correct_headers_asgi(
non_chunked_streaming_app,
):
request, response = await non_chunked_streaming_app.asgi_client.get("/")
assert "Transfer-Encoding" not in response.headers
assert response.headers["Content-Type"] == "text/csv"
assert response.headers["Content-Length"] == "7"
def test_non_chunked_streaming_returns_correct_content(non_chunked_streaming_app,):
request, response = non_chunked_streaming_app.test_client.get("/")
assert response.text == "foo,bar"
@@ -254,9 +261,7 @@ def test_stream_response_status_returns_correct_headers(status):
@pytest.mark.parametrize("keep_alive_timeout", [10, 20, 30])
def test_stream_response_keep_alive_returns_correct_headers(
keep_alive_timeout,
):
def test_stream_response_keep_alive_returns_correct_headers(keep_alive_timeout,):
response = StreamingHTTPResponse(sample_streaming_fn)
headers = response.get_headers(
keep_alive=True, keep_alive_timeout=keep_alive_timeout
@@ -340,13 +345,9 @@ def test_stream_response_writes_correct_content_to_transport_when_not_chunked(
@streaming_app.listener("after_server_start")
async def run_stream(app, loop):
await response.stream(version="1.0")
assert response.protocol.transport.write.call_args_list[1][0][0] == (
b"foo,"
)
assert response.protocol.transport.write.call_args_list[1][0][0] == (b"foo,")
assert response.protocol.transport.write.call_args_list[2][0][0] == (
b"bar"
)
assert response.protocol.transport.write.call_args_list[2][0][0] == (b"bar")
assert len(response.protocol.transport.write.call_args_list) == 3
@@ -391,9 +392,7 @@ def get_file_content(static_file_directory, file_name):
return file.read()
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt", "python.png"])
@pytest.mark.parametrize("status", [200, 401])
def test_file_response(app, file_name, static_file_directory, status):
@app.route("/files/<filename>", methods=["GET"])
@@ -420,9 +419,7 @@ def test_file_response(app, file_name, static_file_directory, status):
("python.png", "logo.png"),
],
)
def test_file_response_custom_filename(
app, source, dest, static_file_directory
):
def test_file_response_custom_filename(app, source, dest, static_file_directory):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -449,8 +446,7 @@ def test_file_head_response(app, file_name, static_file_directory):
headers["Content-Length"] = str(stats.st_size)
if request.method == "HEAD":
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
headers=headers, content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file(
@@ -468,9 +464,7 @@ def test_file_head_response(app, file_name, static_file_directory):
)
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt", "python.png"])
def test_file_stream_response(app, file_name, static_file_directory):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
@@ -496,9 +490,7 @@ def test_file_stream_response(app, file_name, static_file_directory):
("python.png", "logo.png"),
],
)
def test_file_stream_response_custom_filename(
app, source, dest, static_file_directory
):
def test_file_stream_response_custom_filename(app, source, dest, static_file_directory):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -527,8 +519,7 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
stats = await async_os.stat(file_path)
headers["Content-Length"] = str(stats.st_size)
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
headers=headers, content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file_stream(
@@ -551,12 +542,8 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
)
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize(
"size,start,end", [(1024, 0, 1024), (4096, 1024, 8192)]
)
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt", "python.png"])
@pytest.mark.parametrize("size,start,end", [(1024, 0, 1024), (4096, 1024, 8192)])
def test_file_stream_response_range(
app, file_name, static_file_directory, size, start, end
):