Run linter

This commit is contained in:
Adam Hopkins
2018-12-30 13:18:06 +02:00
parent 040468755c
commit 05dd3b2e9d
47 changed files with 3092 additions and 2852 deletions

View File

@@ -10,127 +10,128 @@ from urllib.parse import unquote
import pytest
from aiofiles import os as async_os
from sanic.response import (HTTPResponse, StreamingHTTPResponse, file,
file_stream, json, raw, stream, text)
from sanic.response import (
HTTPResponse,
StreamingHTTPResponse,
file,
file_stream,
json,
raw,
stream,
text,
)
from sanic.server import HttpProtocol
from sanic.testing import HOST, PORT
JSON_DATA = {'ok': True}
JSON_DATA = {"ok": True}
def test_response_body_not_a_string(app):
"""Test when a response body sent from the application is not a string"""
random_num = choice(range(1000))
@app.route('/hello')
@app.route("/hello")
async def hello_route(request):
return HTTPResponse(body=random_num)
request, response = app.test_client.get('/hello')
request, response = app.test_client.get("/hello")
assert response.text == str(random_num)
async def sample_streaming_fn(response):
await response.write('foo,')
await asyncio.sleep(.001)
await response.write('bar')
await response.write("foo,")
await asyncio.sleep(0.001)
await response.write("bar")
def test_method_not_allowed(app):
@app.get('/')
@app.get("/")
async def test_get(request):
return response.json({'hello': 'world'})
return response.json({"hello": "world"})
request, response = app.test_client.head('/')
assert response.headers['Allow'] == 'GET'
request, response = app.test_client.head("/")
assert response.headers["Allow"] == "GET"
request, response = app.test_client.post('/')
assert response.headers['Allow'] == 'GET'
request, response = app.test_client.post("/")
assert response.headers["Allow"] == "GET"
@app.post('/')
@app.post("/")
async def test_post(request):
return response.json({'hello': 'world'})
return response.json({"hello": "world"})
request, response = app.test_client.head('/')
request, response = app.test_client.head("/")
assert response.status == 405
assert set(response.headers['Allow'].split(', ')) == {'GET', 'POST'}
assert response.headers['Content-Length'] == '0'
assert set(response.headers["Allow"].split(", ")) == {"GET", "POST"}
assert response.headers["Content-Length"] == "0"
request, response = app.test_client.patch('/')
request, response = app.test_client.patch("/")
assert response.status == 405
assert set(response.headers['Allow'].split(', ')) == {'GET', 'POST'}
assert response.headers['Content-Length'] == '0'
assert set(response.headers["Allow"].split(", ")) == {"GET", "POST"}
assert response.headers["Content-Length"] == "0"
def test_response_header(app):
@app.get('/')
@app.get("/")
async def test(request):
return json({
"ok": True
}, headers={
'CONTENT-TYPE': 'application/json'
})
return json({"ok": True}, headers={"CONTENT-TYPE": "application/json"})
request, response = app.test_client.get('/')
request, response = app.test_client.get("/")
assert dict(response.headers) == {
'Connection': 'keep-alive',
'Keep-Alive': str(app.config.KEEP_ALIVE_TIMEOUT),
'Content-Length': '11',
'Content-Type': 'application/json',
"Connection": "keep-alive",
"Keep-Alive": str(app.config.KEEP_ALIVE_TIMEOUT),
"Content-Length": "11",
"Content-Type": "application/json",
}
def test_response_content_length(app):
@app.get("/response_with_space")
async def response_with_space(request):
return json({
"message": "Data",
"details": "Some Details"
}, headers={
'CONTENT-TYPE': 'application/json'
})
return json(
{"message": "Data", "details": "Some Details"},
headers={"CONTENT-TYPE": "application/json"},
)
@app.get("/response_without_space")
async def response_without_space(request):
return json({
"message":"Data",
"details":"Some Details"
}, headers={
'CONTENT-TYPE': 'application/json'
})
return json(
{"message": "Data", "details": "Some Details"},
headers={"CONTENT-TYPE": "application/json"},
)
_, response = app.test_client.get("/response_with_space")
content_length_for_response_with_space = response.headers.get("Content-Length")
content_length_for_response_with_space = response.headers.get(
"Content-Length"
)
_, response = app.test_client.get("/response_without_space")
content_length_for_response_without_space = response.headers.get("Content-Length")
content_length_for_response_without_space = response.headers.get(
"Content-Length"
)
assert content_length_for_response_with_space == content_length_for_response_without_space
assert (
content_length_for_response_with_space
== content_length_for_response_without_space
)
assert content_length_for_response_with_space == '43'
assert content_length_for_response_with_space == "43"
def test_response_content_length_with_different_data_types(app):
@app.get("/")
async def get_data_with_different_types(request):
# Indentation issues in the Response is intentional. Please do not fix
return json({
'bool': True,
'none': None,
'string':'string',
'number': -1},
headers={
'CONTENT-TYPE': 'application/json'
})
return json(
{"bool": True, "none": None, "string": "string", "number": -1},
headers={"CONTENT-TYPE": "application/json"},
)
_, response = app.test_client.get("/")
assert response.headers.get("Content-Length") == '55'
assert response.headers.get("Content-Length") == "55"
@pytest.fixture
def json_app(app):
@app.route("/")
async def test(request):
return json(JSON_DATA)
@@ -156,70 +157,72 @@ def json_app(app):
def test_json_response(json_app):
from sanic.response import json_dumps
request, response = json_app.test_client.get('/')
request, response = json_app.test_client.get("/")
assert response.status == 200
assert response.text == json_dumps(JSON_DATA)
assert response.json == JSON_DATA
def test_no_content(json_app):
request, response = json_app.test_client.get('/no-content')
request, response = json_app.test_client.get("/no-content")
assert response.status == 204
assert response.text == ''
assert 'Content-Length' not in response.headers
assert response.text == ""
assert "Content-Length" not in response.headers
request, response = json_app.test_client.get('/no-content/unmodified')
request, response = json_app.test_client.get("/no-content/unmodified")
assert response.status == 304
assert response.text == ''
assert 'Content-Length' not in response.headers
assert 'Content-Type' not in response.headers
assert response.text == ""
assert "Content-Length" not in response.headers
assert "Content-Type" not in response.headers
request, response = json_app.test_client.get('/unmodified')
request, response = json_app.test_client.get("/unmodified")
assert response.status == 304
assert response.text == ''
assert 'Content-Length' not in response.headers
assert 'Content-Type' not in response.headers
assert response.text == ""
assert "Content-Length" not in response.headers
assert "Content-Type" not in response.headers
request, response = json_app.test_client.delete('/')
request, response = json_app.test_client.delete("/")
assert response.status == 204
assert response.text == ''
assert 'Content-Length' not in response.headers
assert response.text == ""
assert "Content-Length" not in response.headers
@pytest.fixture
def streaming_app(app):
@app.route("/")
async def test(request):
return stream(sample_streaming_fn, content_type='text/csv')
return stream(sample_streaming_fn, content_type="text/csv")
return app
def test_streaming_adds_correct_headers(streaming_app):
request, response = streaming_app.test_client.get('/')
assert response.headers['Transfer-Encoding'] == 'chunked'
assert response.headers['Content-Type'] == 'text/csv'
request, response = streaming_app.test_client.get("/")
assert response.headers["Transfer-Encoding"] == "chunked"
assert response.headers["Content-Type"] == "text/csv"
def test_streaming_returns_correct_content(streaming_app):
request, response = streaming_app.test_client.get('/')
assert response.text == 'foo,bar'
request, response = streaming_app.test_client.get("/")
assert response.text == "foo,bar"
@pytest.mark.parametrize('status', [200, 201, 400, 401])
@pytest.mark.parametrize("status", [200, 201, 400, 401])
def test_stream_response_status_returns_correct_headers(status):
response = StreamingHTTPResponse(sample_streaming_fn, status=status)
headers = response.get_headers()
assert b"HTTP/1.1 %s" % str(status).encode() in headers
@pytest.mark.parametrize('keep_alive_timeout', [10, 20, 30])
@pytest.mark.parametrize("keep_alive_timeout", [10, 20, 30])
def test_stream_response_keep_alive_returns_correct_headers(
keep_alive_timeout):
keep_alive_timeout
):
response = StreamingHTTPResponse(sample_streaming_fn)
headers = response.get_headers(
keep_alive=True, keep_alive_timeout=keep_alive_timeout)
keep_alive=True, keep_alive_timeout=keep_alive_timeout
)
assert b"Keep-Alive: %s\r\n" % str(keep_alive_timeout).encode() in headers
@@ -244,19 +247,19 @@ def test_stream_response_writes_correct_content_to_transport(streaming_app):
response.protocol.push_data = mock_push_data
response.protocol.drain = mock_drain
@streaming_app.listener('after_server_start')
@streaming_app.listener("after_server_start")
async def run_stream(app, loop):
await response.stream()
assert response.protocol.transport.write.call_args_list[1][0][0] == (
b'4\r\nfoo,\r\n'
b"4\r\nfoo,\r\n"
)
assert response.protocol.transport.write.call_args_list[2][0][0] == (
b'3\r\nbar\r\n'
b"3\r\nbar\r\n"
)
assert response.protocol.transport.write.call_args_list[3][0][0] == (
b'0\r\n\r\n'
b"0\r\n\r\n"
)
app.stop()
@@ -265,25 +268,23 @@ def test_stream_response_writes_correct_content_to_transport(streaming_app):
def test_stream_response_with_cookies(app):
@app.route("/")
async def test(request):
response = stream(sample_streaming_fn, content_type='text/csv')
response.cookies['test'] = 'modified'
response.cookies['test'] = 'pass'
response = stream(sample_streaming_fn, content_type="text/csv")
response.cookies["test"] = "modified"
response.cookies["test"] = "pass"
return response
request, response = app.test_client.get('/')
assert response.cookies['test'].value == 'pass'
request, response = app.test_client.get("/")
assert response.cookies["test"].value == "pass"
def test_stream_response_without_cookies(app):
@app.route("/")
async def test(request):
return stream(sample_streaming_fn, content_type='text/csv')
return stream(sample_streaming_fn, content_type="text/csv")
request, response = app.test_client.get('/')
request, response = app.test_client.get("/")
assert response.cookies == {}
@@ -292,195 +293,214 @@ def static_file_directory():
"""The static directory to serve"""
current_file = inspect.getfile(inspect.currentframe())
current_directory = os.path.dirname(os.path.abspath(current_file))
static_directory = os.path.join(current_directory, 'static')
static_directory = os.path.join(current_directory, "static")
return static_directory
def get_file_content(static_file_directory, file_name):
"""The content of the static file to check"""
with open(os.path.join(static_file_directory, file_name), 'rb') as file:
with open(os.path.join(static_file_directory, file_name), "rb") as file:
return file.read()
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('status', [200, 401])
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize("status", [200, 401])
def test_file_response(app, file_name, static_file_directory, status):
@app.route('/files/<filename>', methods=['GET'])
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file(file_path, status=status,
mime_type=guess_type(file_path)[0] or 'text/plain')
return file(
file_path,
status=status,
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.get('/files/{}'.format(file_name))
request, response = app.test_client.get("/files/{}".format(file_name))
assert response.status == status
assert response.body == get_file_content(static_file_directory, file_name)
assert 'Content-Disposition' not in response.headers
assert "Content-Disposition" not in response.headers
@pytest.mark.parametrize(
'source,dest',
"source,dest",
[
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'),
('python.png', 'logo.png')
]
("test.file", "my_file.txt"),
("decode me.txt", "readme.md"),
("python.png", "logo.png"),
],
)
def test_file_response_custom_filename(app, source, dest,
static_file_directory):
@app.route('/files/<filename>', methods=['GET'])
def test_file_response_custom_filename(
app, source, dest, static_file_directory
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file(file_path, filename=dest)
request, response = app.test_client.get('/files/{}'.format(source))
request, response = app.test_client.get("/files/{}".format(source))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert response.headers['Content-Disposition'] == \
'attachment; filename="{}"'.format(dest)
assert response.headers[
"Content-Disposition"
] == 'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_file_head_response(app, file_name, static_file_directory):
@app.route('/files/<filename>', methods=['GET', 'HEAD'])
@app.route("/files/<filename>", methods=["GET", "HEAD"])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
stats = await async_os.stat(file_path)
headers = dict()
headers['Accept-Ranges'] = 'bytes'
headers['Content-Length'] = str(stats.st_size)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method == "HEAD":
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or 'text/plain')
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file(file_path, headers=headers,
mime_type=guess_type(file_path)[0] or 'text/plain')
return file(
file_path,
headers=headers,
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.head('/files/{}'.format(file_name))
request, response = app.test_client.head("/files/{}".format(file_name))
assert response.status == 200
assert 'Accept-Ranges' in response.headers
assert 'Content-Length' in response.headers
assert int(response.headers[
'Content-Length']) == len(
get_file_content(static_file_directory, file_name))
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
def test_file_stream_response(app, file_name, static_file_directory):
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(file_path, chunk_size=32,
mime_type=guess_type(file_path)[0] or 'text/plain')
request, response = app.test_client.get('/files/{}'.format(file_name))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
assert 'Content-Disposition' not in response.headers
assert "Accept-Ranges" in response.headers
assert "Content-Length" in response.headers
assert int(response.headers["Content-Length"]) == len(
get_file_content(static_file_directory, file_name)
)
@pytest.mark.parametrize(
'source,dest',
[
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'),
('python.png', 'logo.png')
]
"file_name", ["test.file", "decode me.txt", "python.png"]
)
def test_file_stream_response_custom_filename(app, source, dest,
static_file_directory):
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(file_path, chunk_size=32, filename=dest)
request, response = app.test_client.get('/files/{}'.format(source))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert response.headers['Content-Disposition'] == \
'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_file_stream_head_response(app, file_name, static_file_directory):
@app.route('/files/<filename>', methods=['GET', 'HEAD'])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
headers = dict()
headers['Accept-Ranges'] = 'bytes'
if request.method == "HEAD":
# Return a normal HTTPResponse, not a
# StreamingHTTPResponse for a HEAD request
stats = await async_os.stat(file_path)
headers['Content-Length'] = str(stats.st_size)
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or 'text/plain')
else:
return file_stream(
file_path, chunk_size=32, headers=headers,
mime_type=guess_type(file_path)[0] or 'text/plain'
)
request, response = app.test_client.head('/files/{}'.format(file_name))
assert response.status == 200
# A HEAD request should never be streamed/chunked.
if 'Transfer-Encoding' in response.headers:
assert response.headers['Transfer-Encoding'] != "chunked"
assert 'Accept-Ranges' in response.headers
# A HEAD request should get the Content-Length too
assert 'Content-Length' in response.headers
assert int(response.headers[
'Content-Length']) == len(
get_file_content(static_file_directory, file_name))
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('size,start,end', [
(1024, 0, 1024),
(4096, 1024, 8192),
])
def test_file_stream_response_range(app, file_name, static_file_directory, size, start, end):
Range = namedtuple('Range', ['size', 'start', 'end', 'total'])
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)
@app.route('/files/<filename>', methods=['GET'])
def test_file_stream_response(app, file_name, static_file_directory):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(
file_path,
chunk_size=32,
mime_type=guess_type(file_path)[0] or 'text/plain',
_range=range)
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.get('/files/{}'.format(file_name))
request, response = app.test_client.get("/files/{}".format(file_name))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
assert "Content-Disposition" not in response.headers
@pytest.mark.parametrize(
"source,dest",
[
("test.file", "my_file.txt"),
("decode me.txt", "readme.md"),
("python.png", "logo.png"),
],
)
def test_file_stream_response_custom_filename(
app, source, dest, static_file_directory
):
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(file_path, chunk_size=32, filename=dest)
request, response = app.test_client.get("/files/{}".format(source))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert response.headers[
"Content-Disposition"
] == 'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_file_stream_head_response(app, file_name, static_file_directory):
@app.route("/files/<filename>", methods=["GET", "HEAD"])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
headers = dict()
headers["Accept-Ranges"] = "bytes"
if request.method == "HEAD":
# Return a normal HTTPResponse, not a
# StreamingHTTPResponse for a HEAD request
stats = await async_os.stat(file_path)
headers["Content-Length"] = str(stats.st_size)
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file_stream(
file_path,
chunk_size=32,
headers=headers,
mime_type=guess_type(file_path)[0] or "text/plain",
)
request, response = app.test_client.head("/files/{}".format(file_name))
assert response.status == 200
# A HEAD request should never be streamed/chunked.
if "Transfer-Encoding" in response.headers:
assert response.headers["Transfer-Encoding"] != "chunked"
assert "Accept-Ranges" in response.headers
# A HEAD request should get the Content-Length too
assert "Content-Length" in response.headers
assert int(response.headers["Content-Length"]) == len(
get_file_content(static_file_directory, file_name)
)
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
)
@pytest.mark.parametrize(
"size,start,end", [(1024, 0, 1024), (4096, 1024, 8192)]
)
def test_file_stream_response_range(
app, file_name, static_file_directory, size, start, end
):
Range = namedtuple("Range", ["size", "start", "end", "total"])
total = len(get_file_content(static_file_directory, file_name))
range = Range(size=size, start=start, end=end, total=total)
@app.route("/files/<filename>", methods=["GET"])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
return file_stream(
file_path,
chunk_size=32,
mime_type=guess_type(file_path)[0] or "text/plain",
_range=range,
)
request, response = app.test_client.get("/files/{}".format(file_name))
assert response.status == 206
assert 'Content-Range' in response.headers
assert response.headers['Content-Range'] == 'bytes {}-{}/{}'.format(range.start, range.end, range.total)
assert "Content-Range" in response.headers
assert response.headers["Content-Range"] == "bytes {}-{}/{}".format(
range.start, range.end, range.total
)
def test_raw_response(app):
@app.get('/test')
@app.get("/test")
def handler(request):
return raw(b'raw_response')
return raw(b"raw_response")
request, response = app.test_client.get('/test')
assert response.content_type == 'application/octet-stream'
assert response.body == b'raw_response'
request, response = app.test_client.get("/test")
assert response.content_type == "application/octet-stream"
assert response.body == b"raw_response"