From 5c5656f98187b7fbbb0476af62ff4b0954bda83f Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Sat, 20 May 2017 09:41:36 +1000 Subject: [PATCH] Moved file_stream tests to test_responses.py --- tests/test_file_response.py | 70 ------------------ tests/test_file_stream_response.py | 77 ------------------- tests/test_response.py | 115 ++++++++++++++++++++++++++++- 3 files changed, 113 insertions(+), 149 deletions(-) delete mode 100644 tests/test_file_response.py delete mode 100644 tests/test_file_stream_response.py diff --git a/tests/test_file_response.py b/tests/test_file_response.py deleted file mode 100644 index 7574215b..00000000 --- a/tests/test_file_response.py +++ /dev/null @@ -1,70 +0,0 @@ -import inspect -import os -from mimetypes import guess_type -from urllib.parse import unquote -from aiofiles import os as async_os -import pytest - -from sanic import Sanic -from sanic.response import file, HTTPResponse - - -@pytest.fixture(scope='module') -def static_file_directory(): - """The static directory to serve""" - current_file = inspect.getfile(inspect.currentframe()) - current_directory = os.path.dirname(os.path.abspath(current_file)) - static_directory = os.path.join(current_directory, 'static') - return static_directory - - -def get_file_path(static_file_directory, file_name): - return os.path.join(static_file_directory, file_name) - - -def get_file_content(static_file_directory, file_name): - """The content of the static file to check""" - with open(get_file_path(static_file_directory, file_name), 'rb') as file: - return file.read() - - -@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png']) -def test_file_helper(file_name, static_file_directory): - app = Sanic('test_file_helper') - @app.route('/files/', methods=['GET']) - def file_route(request, filename): - file_path = os.path.join(static_file_directory, filename) - file_path = os.path.abspath(unquote(file_path)) - return file(file_path, mime_type=guess_type(file_path)[0] or 'text/plain') - - request, response = app.test_client.get('/files/{}'.format(file_name)) - assert response.status == 200 - assert response.body == get_file_content(static_file_directory, file_name) - - -@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt']) -def test_file_helper_head_request(file_name, static_file_directory): - app = Sanic('test_file_helper') - @app.route('/files/', methods=['GET', 'HEAD']) - async def file_route(request, filename): - file_path = os.path.join(static_file_directory, filename) - file_path = os.path.abspath(unquote(file_path)) - stats = await async_os.stat(file_path) - headers = dict() - headers['Accept-Ranges'] = 'bytes' - headers['Content-Length'] = str(stats.st_size) - if request.method == "HEAD": - return HTTPResponse( - headers=headers, - content_type=guess_type(file_path)[0] or 'text/plain') - else: - return file(file_path, headers=headers, - mime_type=guess_type(file_path)[0] or 'text/plain') - - request, response = app.test_client.head('/files/{}'.format(file_name)) - assert response.status == 200 - assert 'Accept-Ranges' in response.headers - assert 'Content-Length' in response.headers - assert int(response.headers[ - 'Content-Length']) == len( - get_file_content(static_file_directory, file_name)) diff --git a/tests/test_file_stream_response.py b/tests/test_file_stream_response.py deleted file mode 100644 index 2a79a683..00000000 --- a/tests/test_file_stream_response.py +++ /dev/null @@ -1,77 +0,0 @@ -import inspect -import os -from mimetypes import guess_type -from urllib.parse import unquote -from aiofiles import os as async_os -import pytest - -from sanic import Sanic -from sanic.response import file_stream, HTTPResponse - - -@pytest.fixture(scope='module') -def static_file_directory(): - """The static directory to serve""" - current_file = inspect.getfile(inspect.currentframe()) - current_directory = os.path.dirname(os.path.abspath(current_file)) - static_directory = os.path.join(current_directory, 'static') - return static_directory - - -def get_file_path(static_file_directory, file_name): - return os.path.join(static_file_directory, file_name) - - -def get_file_content(static_file_directory, file_name): - """The content of the static file to check""" - with open(get_file_path(static_file_directory, file_name), 'rb') as file: - return file.read() - - -@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png']) -def test_file_stream_helper(file_name, static_file_directory): - app = Sanic('test_file_helper') - @app.route('/files/', methods=['GET']) - def file_route(request, filename): - file_path = os.path.join(static_file_directory, filename) - file_path = os.path.abspath(unquote(file_path)) - return file_stream(file_path, chunk_size=32, - mime_type=guess_type(file_path)[0] or 'text/plain') - - request, response = app.test_client.get('/files/{}'.format(file_name)) - assert response.status == 200 - assert response.body == get_file_content(static_file_directory, file_name) - - -@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt']) -def test_file_helper_head_request(file_name, static_file_directory): - app = Sanic('test_file_helper') - @app.route('/files/', methods=['GET', 'HEAD']) - async def file_route(request, filename): - file_path = os.path.join(static_file_directory, filename) - file_path = os.path.abspath(unquote(file_path)) - headers = dict() - headers['Accept-Ranges'] = 'bytes' - if request.method == "HEAD": - # Return a normal HTTPResponse, not a - # StreamingHTTPResponse for a HEAD request - stats = await async_os.stat(file_path) - headers['Content-Length'] = str(stats.st_size) - return HTTPResponse( - headers=headers, - content_type=guess_type(file_path)[0] or 'text/plain') - else: - return file_stream(file_path, chunk_size=32, headers=headers, - mime_type=guess_type(file_path)[0] or 'text/plain') - - request, response = app.test_client.head('/files/{}'.format(file_name)) - assert response.status == 200 - # A HEAD request should never be streamed/chunked. - if 'Transfer-Encoding' in response.headers: - assert response.headers['Transfer-Encoding'] != "chunked" - assert 'Accept-Ranges' in response.headers - # A HEAD request should get the Content-Length too - assert 'Content-Length' in response.headers - assert int(response.headers[ - 'Content-Length']) == len( - get_file_content(static_file_directory, file_name)) diff --git a/tests/test_response.py b/tests/test_response.py index ff5fd42b..ba87f68d 100644 --- a/tests/test_response.py +++ b/tests/test_response.py @@ -1,13 +1,20 @@ import asyncio +import inspect +import os +from aiofiles import os as async_os +from mimetypes import guess_type +from urllib.parse import unquote + import pytest from random import choice from sanic import Sanic -from sanic.response import HTTPResponse, stream, StreamingHTTPResponse +from sanic.response import HTTPResponse, stream, StreamingHTTPResponse, file, file_stream from sanic.testing import HOST, PORT - from unittest.mock import MagicMock + + def test_response_body_not_a_string(): """Test when a response body sent from the application is not a string""" app = Sanic('response_body_not_a_string') @@ -94,3 +101,107 @@ def test_stream_response_writes_correct_content_to_transport(streaming_app): app.stop() streaming_app.run(host=HOST, port=PORT) + + +@pytest.fixture +def static_file_directory(): + """The static directory to serve""" + current_file = inspect.getfile(inspect.currentframe()) + current_directory = os.path.dirname(os.path.abspath(current_file)) + static_directory = os.path.join(current_directory, 'static') + return static_directory + + +def get_file_content(static_file_directory, file_name): + """The content of the static file to check""" + with open(os.path.join(static_file_directory, file_name), 'rb') as file: + return file.read() + +@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png']) +def test_file_response(file_name, static_file_directory): + app = Sanic('test_file_helper') + @app.route('/files/', methods=['GET']) + def file_route(request, filename): + file_path = os.path.join(static_file_directory, filename) + file_path = os.path.abspath(unquote(file_path)) + return file(file_path, mime_type=guess_type(file_path)[0] or 'text/plain') + + request, response = app.test_client.get('/files/{}'.format(file_name)) + assert response.status == 200 + assert response.body == get_file_content(static_file_directory, file_name) + + +@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt']) +def test_file_head_response(file_name, static_file_directory): + app = Sanic('test_file_helper') + @app.route('/files/', methods=['GET', 'HEAD']) + async def file_route(request, filename): + file_path = os.path.join(static_file_directory, filename) + file_path = os.path.abspath(unquote(file_path)) + stats = await async_os.stat(file_path) + headers = dict() + headers['Accept-Ranges'] = 'bytes' + headers['Content-Length'] = str(stats.st_size) + if request.method == "HEAD": + return HTTPResponse( + headers=headers, + content_type=guess_type(file_path)[0] or 'text/plain') + else: + return file(file_path, headers=headers, + mime_type=guess_type(file_path)[0] or 'text/plain') + + request, response = app.test_client.head('/files/{}'.format(file_name)) + assert response.status == 200 + assert 'Accept-Ranges' in response.headers + assert 'Content-Length' in response.headers + assert int(response.headers[ + 'Content-Length']) == len( + get_file_content(static_file_directory, file_name)) + +@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png']) +def test_file_stream_response(file_name, static_file_directory): + app = Sanic('test_file_helper') + @app.route('/files/', methods=['GET']) + def file_route(request, filename): + file_path = os.path.join(static_file_directory, filename) + file_path = os.path.abspath(unquote(file_path)) + return file_stream(file_path, chunk_size=32, + mime_type=guess_type(file_path)[0] or 'text/plain') + + request, response = app.test_client.get('/files/{}'.format(file_name)) + assert response.status == 200 + assert response.body == get_file_content(static_file_directory, file_name) + + +@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt']) +def test_file_stream_head_response(file_name, static_file_directory): + app = Sanic('test_file_helper') + @app.route('/files/', methods=['GET', 'HEAD']) + async def file_route(request, filename): + file_path = os.path.join(static_file_directory, filename) + file_path = os.path.abspath(unquote(file_path)) + headers = dict() + headers['Accept-Ranges'] = 'bytes' + if request.method == "HEAD": + # Return a normal HTTPResponse, not a + # StreamingHTTPResponse for a HEAD request + stats = await async_os.stat(file_path) + headers['Content-Length'] = str(stats.st_size) + return HTTPResponse( + headers=headers, + content_type=guess_type(file_path)[0] or 'text/plain') + else: + return file_stream(file_path, chunk_size=32, headers=headers, + mime_type=guess_type(file_path)[0] or 'text/plain') + + request, response = app.test_client.head('/files/{}'.format(file_name)) + assert response.status == 200 + # A HEAD request should never be streamed/chunked. + if 'Transfer-Encoding' in response.headers: + assert response.headers['Transfer-Encoding'] != "chunked" + assert 'Accept-Ranges' in response.headers + # A HEAD request should get the Content-Length too + assert 'Content-Length' in response.headers + assert int(response.headers[ + 'Content-Length']) == len( + get_file_content(static_file_directory, file_name)) \ No newline at end of file