Merge pull request #1378 from hramezani/fix_some_lint_error
Fix some test files lint errors.
This commit is contained in:
commit
5f486cc25f
|
@ -3,6 +3,7 @@ import asyncio
|
||||||
|
|
||||||
def test_bad_request_response(app):
|
def test_bad_request_response(app):
|
||||||
lines = []
|
lines = []
|
||||||
|
|
||||||
@app.listener('after_server_start')
|
@app.listener('after_server_start')
|
||||||
async def _request(sanic, loop):
|
async def _request(sanic, loop):
|
||||||
connect = asyncio.open_connection('127.0.0.1', 42101)
|
connect = asyncio.open_connection('127.0.0.1', 42101)
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from http.cookies import SimpleCookie
|
from http.cookies import SimpleCookie
|
||||||
from sanic import Sanic
|
from sanic.response import text
|
||||||
from sanic.response import json, text
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@ -62,6 +61,7 @@ def test_false_cookies(app, httponly, expected):
|
||||||
|
|
||||||
assert ('HttpOnly' in response_cookies['right_back'].output()) == expected
|
assert ('HttpOnly' in response_cookies['right_back'].output()) == expected
|
||||||
|
|
||||||
|
|
||||||
def test_http2_cookies(app):
|
def test_http2_cookies(app):
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
|
@ -74,6 +74,7 @@ def test_http2_cookies(app):
|
||||||
|
|
||||||
assert response.text == 'Cookies are: working!'
|
assert response.text == 'Cookies are: working!'
|
||||||
|
|
||||||
|
|
||||||
def test_cookie_options(app):
|
def test_cookie_options(app):
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
|
@ -81,7 +82,8 @@ def test_cookie_options(app):
|
||||||
response = text("OK")
|
response = text("OK")
|
||||||
response.cookies['test'] = 'at you'
|
response.cookies['test'] = 'at you'
|
||||||
response.cookies['test']['httponly'] = True
|
response.cookies['test']['httponly'] = True
|
||||||
response.cookies['test']['expires'] = datetime.now() + timedelta(seconds=10)
|
response.cookies['test']['expires'] = (datetime.now() +
|
||||||
|
timedelta(seconds=10))
|
||||||
return response
|
return response
|
||||||
|
|
||||||
request, response = app.test_client.get('/')
|
request, response = app.test_client.get('/')
|
||||||
|
@ -89,7 +91,8 @@ def test_cookie_options(app):
|
||||||
response_cookies.load(response.headers.get('Set-Cookie', {}))
|
response_cookies.load(response.headers.get('Set-Cookie', {}))
|
||||||
|
|
||||||
assert response_cookies['test'].value == 'at you'
|
assert response_cookies['test'].value == 'at you'
|
||||||
assert response_cookies['test']['httponly'] == True
|
assert response_cookies['test']['httponly'] is True
|
||||||
|
|
||||||
|
|
||||||
def test_cookie_deletion(app):
|
def test_cookie_deletion(app):
|
||||||
|
|
||||||
|
@ -107,4 +110,4 @@ def test_cookie_deletion(app):
|
||||||
|
|
||||||
assert int(response_cookies['i_want_to_die']['max-age']) == 0
|
assert int(response_cookies['i_want_to_die']['max-age']) == 0
|
||||||
with pytest.raises(KeyError):
|
with pytest.raises(KeyError):
|
||||||
hold_my_beer = response.cookies['i_never_existed']
|
response.cookies['i_never_existed']
|
||||||
|
|
|
@ -28,6 +28,7 @@ def test_create_task(app):
|
||||||
request, response = app.test_client.get('/late')
|
request, response = app.test_client.get('/late')
|
||||||
assert response.body == b'True'
|
assert response.body == b'True'
|
||||||
|
|
||||||
|
|
||||||
def test_create_task_with_app_arg(app):
|
def test_create_task_with_app_arg(app):
|
||||||
q = Queue()
|
q = Queue()
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
from sanic import Sanic
|
|
||||||
from sanic.response import text
|
from sanic.response import text
|
||||||
from sanic.router import RouteExists
|
from sanic.router import RouteExists
|
||||||
import pytest
|
import pytest
|
||||||
|
|
|
@ -131,7 +131,7 @@ def test_exception_handler_lookup():
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ModuleNotFoundError
|
ModuleNotFoundError
|
||||||
except:
|
except Exception:
|
||||||
class ModuleNotFoundError(ImportError):
|
class ModuleNotFoundError(ImportError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -140,10 +140,11 @@ class ReuseableSanicTestClient(SanicTestClient):
|
||||||
if self._tcp_connector:
|
if self._tcp_connector:
|
||||||
conn = self._tcp_connector
|
conn = self._tcp_connector
|
||||||
else:
|
else:
|
||||||
conn = ReuseableTCPConnector(verify_ssl=False,
|
conn = ReuseableTCPConnector(
|
||||||
|
verify_ssl=False,
|
||||||
loop=self._loop,
|
loop=self._loop,
|
||||||
keepalive_timeout=
|
keepalive_timeout=request_keepalive
|
||||||
request_keepalive)
|
)
|
||||||
self._tcp_connector = conn
|
self._tcp_connector = conn
|
||||||
session = aiohttp.ClientSession(cookies=cookies,
|
session = aiohttp.ClientSession(cookies=cookies,
|
||||||
connector=conn,
|
connector=conn,
|
||||||
|
|
|
@ -11,11 +11,11 @@ def test_middleware_request(app):
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
@app.middleware
|
@app.middleware
|
||||||
async def handler(request):
|
async def handler1(request):
|
||||||
results.append(request)
|
results.append(request)
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
async def handler(request):
|
async def handler2(request):
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
request, response = app.test_client.get('/')
|
request, response = app.test_client.get('/')
|
||||||
|
@ -28,7 +28,7 @@ def test_middleware_response(app):
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
@app.middleware('request')
|
@app.middleware('request')
|
||||||
async def process_response(request):
|
async def process_request(request):
|
||||||
results.append(request)
|
results.append(request)
|
||||||
|
|
||||||
@app.middleware('response')
|
@app.middleware('response')
|
||||||
|
@ -68,6 +68,7 @@ def test_middleware_response_exception(app):
|
||||||
assert response.text == 'OK'
|
assert response.text == 'OK'
|
||||||
assert result['status_code'] == 404
|
assert result['status_code'] == 404
|
||||||
|
|
||||||
|
|
||||||
def test_middleware_override_request(app):
|
def test_middleware_override_request(app):
|
||||||
|
|
||||||
@app.middleware
|
@app.middleware
|
||||||
|
@ -134,4 +135,4 @@ def test_middleware_order(app):
|
||||||
request, response = app.test_client.get('/')
|
request, response = app.test_client.get('/')
|
||||||
|
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert order == [1,2,3,4,5,6]
|
assert order == [1, 2, 3, 4, 5, 6]
|
||||||
|
|
|
@ -336,7 +336,7 @@ def test_overload_routes(app):
|
||||||
return text('OK1')
|
return text('OK1')
|
||||||
|
|
||||||
@app.route('/overload', methods=['POST', 'PUT'], name='route_second')
|
@app.route('/overload', methods=['POST', 'PUT'], name='route_second')
|
||||||
async def handler1(request):
|
async def handler2(request):
|
||||||
return text('OK2')
|
return text('OK2')
|
||||||
|
|
||||||
request, response = app.test_client.get(app.url_for('route_first'))
|
request, response = app.test_client.get(app.url_for('route_first'))
|
||||||
|
|
|
@ -19,15 +19,15 @@ def redirect_app(app):
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
@app.route('/1')
|
@app.route('/1')
|
||||||
def handler(request):
|
def handler1(request):
|
||||||
return redirect('/2')
|
return redirect('/2')
|
||||||
|
|
||||||
@app.route('/2')
|
@app.route('/2')
|
||||||
def handler(request):
|
def handler2(request):
|
||||||
return redirect('/3')
|
return redirect('/3')
|
||||||
|
|
||||||
@app.route('/3')
|
@app.route('/3')
|
||||||
def handler(request):
|
def handler3(request):
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
@app.route('/redirect_with_header_injection')
|
@app.route('/redirect_with_header_injection')
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import pytest
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
|
|
||||||
|
@ -34,7 +33,7 @@ async def test_request_cancel_when_connection_lost(loop, app, test_client):
|
||||||
assert app.still_serving_cancelled_request is False
|
assert app.still_serving_cancelled_request is False
|
||||||
|
|
||||||
|
|
||||||
async def test_stream_request_cancel_when_connection_lost(loop, app, test_client):
|
async def test_stream_request_cancel_when_conn_lost(loop, app, test_client):
|
||||||
app.still_serving_cancelled_request = False
|
app.still_serving_cancelled_request = False
|
||||||
|
|
||||||
@app.post('/post/<id>', stream=True)
|
@app.post('/post/<id>', stream=True)
|
||||||
|
|
|
@ -18,7 +18,10 @@ def test_storage(app):
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
def handler(request):
|
def handler(request):
|
||||||
return json({'user': request.get('user'), 'sidekick': request.get('sidekick')})
|
return json({
|
||||||
|
'user': request.get('user'),
|
||||||
|
'sidekick': request.get('sidekick')
|
||||||
|
})
|
||||||
|
|
||||||
request, response = app.test_client.get('/')
|
request, response = app.test_client.get('/')
|
||||||
|
|
||||||
|
|
|
@ -181,7 +181,8 @@ def test_request_stream_handle_exception(app):
|
||||||
# 405
|
# 405
|
||||||
request, response = app.test_client.get('/post/random_id', data=data)
|
request, response = app.test_client.get('/post/random_id', data=data)
|
||||||
assert response.status == 405
|
assert response.status == 405
|
||||||
assert response.text == 'Error: Method GET not allowed for URL /post/random_id'
|
assert response.text == 'Error: Method GET not allowed for URL' \
|
||||||
|
' /post/random_id'
|
||||||
|
|
||||||
|
|
||||||
def test_request_stream_blueprint(app):
|
def test_request_stream_blueprint(app):
|
||||||
|
|
|
@ -78,7 +78,7 @@ class DelayableTCPConnector(TCPConnector):
|
||||||
await asyncio.sleep(self.delay)
|
await asyncio.sleep(self.delay)
|
||||||
t = req.loop.time()
|
t = req.loop.time()
|
||||||
print("sending at {}".format(t), flush=True)
|
print("sending at {}".format(t), flush=True)
|
||||||
conn = next(iter(args)) # first arg is connection
|
next(iter(args)) # first arg is connection
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return await self.orig_send(*args, **kwargs)
|
return await self.orig_send(*args, **kwargs)
|
||||||
|
|
|
@ -97,7 +97,7 @@ def test_json(app):
|
||||||
|
|
||||||
results = json_loads(response.text)
|
results = json_loads(response.text)
|
||||||
|
|
||||||
assert results.get('test') == True
|
assert results.get('test') is True
|
||||||
|
|
||||||
|
|
||||||
def test_empty_json(app):
|
def test_empty_json(app):
|
||||||
|
@ -278,22 +278,23 @@ def test_post_form_urlencoded(app):
|
||||||
payload = 'test=OK'
|
payload = 'test=OK'
|
||||||
headers = {'content-type': 'application/x-www-form-urlencoded'}
|
headers = {'content-type': 'application/x-www-form-urlencoded'}
|
||||||
|
|
||||||
request, response = app.test_client.post('/', data=payload, headers=headers)
|
request, response = app.test_client.post('/', data=payload,
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
assert request.form.get('test') == 'OK'
|
assert request.form.get('test') == 'OK'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'payload', [
|
'payload', [
|
||||||
'------sanic\r\n' \
|
'------sanic\r\n'
|
||||||
'Content-Disposition: form-data; name="test"\r\n' \
|
'Content-Disposition: form-data; name="test"\r\n'
|
||||||
'\r\n' \
|
'\r\n'
|
||||||
'OK\r\n' \
|
'OK\r\n'
|
||||||
'------sanic--\r\n',
|
'------sanic--\r\n',
|
||||||
'------sanic\r\n' \
|
'------sanic\r\n'
|
||||||
'content-disposition: form-data; name="test"\r\n' \
|
'content-disposition: form-data; name="test"\r\n'
|
||||||
'\r\n' \
|
'\r\n'
|
||||||
'OK\r\n' \
|
'OK\r\n'
|
||||||
'------sanic--\r\n',
|
'------sanic--\r\n',
|
||||||
])
|
])
|
||||||
def test_post_form_multipart_form_data(app, payload):
|
def test_post_form_multipart_form_data(app, payload):
|
||||||
|
|
|
@ -40,7 +40,7 @@ async def sample_streaming_fn(response):
|
||||||
def test_method_not_allowed(app):
|
def test_method_not_allowed(app):
|
||||||
|
|
||||||
@app.get('/')
|
@app.get('/')
|
||||||
async def test(request):
|
async def test_get(request):
|
||||||
return response.json({'hello': 'world'})
|
return response.json({'hello': 'world'})
|
||||||
|
|
||||||
request, response = app.test_client.head('/')
|
request, response = app.test_client.head('/')
|
||||||
|
@ -49,9 +49,8 @@ def test_method_not_allowed(app):
|
||||||
request, response = app.test_client.post('/')
|
request, response = app.test_client.post('/')
|
||||||
assert response.headers['Allow'] == 'GET'
|
assert response.headers['Allow'] == 'GET'
|
||||||
|
|
||||||
|
|
||||||
@app.post('/')
|
@app.post('/')
|
||||||
async def test(request):
|
async def test_post(request):
|
||||||
return response.json({'hello': 'world'})
|
return response.json({'hello': 'world'})
|
||||||
|
|
||||||
request, response = app.test_client.head('/')
|
request, response = app.test_client.head('/')
|
||||||
|
@ -239,7 +238,8 @@ def get_file_content(static_file_directory, file_name):
|
||||||
return file.read()
|
return file.read()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
|
@pytest.mark.parametrize('file_name',
|
||||||
|
['test.file', 'decode me.txt', 'python.png'])
|
||||||
@pytest.mark.parametrize('status', [200, 401])
|
@pytest.mark.parametrize('status', [200, 401])
|
||||||
def test_file_response(app, file_name, static_file_directory, status):
|
def test_file_response(app, file_name, static_file_directory, status):
|
||||||
|
|
||||||
|
@ -256,9 +256,15 @@ def test_file_response(app, file_name, static_file_directory, status):
|
||||||
assert 'Content-Disposition' not in response.headers
|
assert 'Content-Disposition' not in response.headers
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('source,dest', [
|
@pytest.mark.parametrize(
|
||||||
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'), ('python.png', 'logo.png')])
|
'source,dest',
|
||||||
def test_file_response_custom_filename(app, source, dest, static_file_directory):
|
[
|
||||||
|
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'),
|
||||||
|
('python.png', 'logo.png')
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_file_response_custom_filename(app, source, dest,
|
||||||
|
static_file_directory):
|
||||||
|
|
||||||
@app.route('/files/<filename>', methods=['GET'])
|
@app.route('/files/<filename>', methods=['GET'])
|
||||||
def file_route(request, filename):
|
def file_route(request, filename):
|
||||||
|
@ -269,7 +275,8 @@ def test_file_response_custom_filename(app, source, dest, static_file_directory)
|
||||||
request, response = app.test_client.get('/files/{}'.format(source))
|
request, response = app.test_client.get('/files/{}'.format(source))
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.body == get_file_content(static_file_directory, source)
|
assert response.body == get_file_content(static_file_directory, source)
|
||||||
assert response.headers['Content-Disposition'] == 'attachment; filename="{}"'.format(dest)
|
assert response.headers['Content-Disposition'] == \
|
||||||
|
'attachment; filename="{}"'.format(dest)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
|
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
|
||||||
|
@ -300,7 +307,8 @@ def test_file_head_response(app, file_name, static_file_directory):
|
||||||
get_file_content(static_file_directory, file_name))
|
get_file_content(static_file_directory, file_name))
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
|
@pytest.mark.parametrize('file_name',
|
||||||
|
['test.file', 'decode me.txt', 'python.png'])
|
||||||
def test_file_stream_response(app, file_name, static_file_directory):
|
def test_file_stream_response(app, file_name, static_file_directory):
|
||||||
|
|
||||||
@app.route('/files/<filename>', methods=['GET'])
|
@app.route('/files/<filename>', methods=['GET'])
|
||||||
|
@ -316,9 +324,15 @@ def test_file_stream_response(app, file_name, static_file_directory):
|
||||||
assert 'Content-Disposition' not in response.headers
|
assert 'Content-Disposition' not in response.headers
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('source,dest', [
|
@pytest.mark.parametrize(
|
||||||
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'), ('python.png', 'logo.png')])
|
'source,dest',
|
||||||
def test_file_stream_response_custom_filename(app, source, dest, static_file_directory):
|
[
|
||||||
|
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'),
|
||||||
|
('python.png', 'logo.png')
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_file_stream_response_custom_filename(app, source, dest,
|
||||||
|
static_file_directory):
|
||||||
|
|
||||||
@app.route('/files/<filename>', methods=['GET'])
|
@app.route('/files/<filename>', methods=['GET'])
|
||||||
def file_route(request, filename):
|
def file_route(request, filename):
|
||||||
|
@ -329,7 +343,8 @@ def test_file_stream_response_custom_filename(app, source, dest, static_file_dir
|
||||||
request, response = app.test_client.get('/files/{}'.format(source))
|
request, response = app.test_client.get('/files/{}'.format(source))
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert response.body == get_file_content(static_file_directory, source)
|
assert response.body == get_file_content(static_file_directory, source)
|
||||||
assert response.headers['Content-Disposition'] == 'attachment; filename="{}"'.format(dest)
|
assert response.headers['Content-Disposition'] == \
|
||||||
|
'attachment; filename="{}"'.format(dest)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
|
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
|
||||||
|
@ -350,8 +365,10 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
|
||||||
headers=headers,
|
headers=headers,
|
||||||
content_type=guess_type(file_path)[0] or 'text/plain')
|
content_type=guess_type(file_path)[0] or 'text/plain')
|
||||||
else:
|
else:
|
||||||
return file_stream(file_path, chunk_size=32, headers=headers,
|
return file_stream(
|
||||||
mime_type=guess_type(file_path)[0] or 'text/plain')
|
file_path, chunk_size=32, headers=headers,
|
||||||
|
mime_type=guess_type(file_path)[0] or 'text/plain'
|
||||||
|
)
|
||||||
|
|
||||||
request, response = app.test_client.head('/files/{}'.format(file_name))
|
request, response = app.test_client.head('/files/{}'.format(file_name))
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
|
|
|
@ -64,12 +64,12 @@ def test_shorthand_routes_multiple(app):
|
||||||
def test_route_strict_slash(app):
|
def test_route_strict_slash(app):
|
||||||
|
|
||||||
@app.get('/get', strict_slashes=True)
|
@app.get('/get', strict_slashes=True)
|
||||||
def handler(request):
|
def handler1(request):
|
||||||
assert request.stream is None
|
assert request.stream is None
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
@app.post('/post/', strict_slashes=True)
|
@app.post('/post/', strict_slashes=True)
|
||||||
def handler(request):
|
def handler2(request):
|
||||||
assert request.stream is None
|
assert request.stream is None
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
|
@ -133,11 +133,11 @@ def test_route_strict_slash_default_value_can_be_overwritten():
|
||||||
def test_route_slashes_overload(app):
|
def test_route_slashes_overload(app):
|
||||||
|
|
||||||
@app.get('/hello/')
|
@app.get('/hello/')
|
||||||
def handler(request):
|
def handler_get(request):
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
@app.post('/hello/')
|
@app.post('/hello/')
|
||||||
def handler(request):
|
def handler_post(request):
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
request, response = app.test_client.get('/hello')
|
request, response = app.test_client.get('/hello')
|
||||||
|
@ -408,7 +408,8 @@ def test_dynamic_route_uuid(app):
|
||||||
results.append(unique_id)
|
results.append(unique_id)
|
||||||
return text('OK')
|
return text('OK')
|
||||||
|
|
||||||
request, response = app.test_client.get('/quirky/123e4567-e89b-12d3-a456-426655440000')
|
url = '/quirky/123e4567-e89b-12d3-a456-426655440000'
|
||||||
|
request, response = app.test_client.get(url)
|
||||||
assert response.text == 'OK'
|
assert response.text == 'OK'
|
||||||
assert type(results[0]) is uuid.UUID
|
assert type(results[0]) is uuid.UUID
|
||||||
|
|
||||||
|
@ -532,11 +533,11 @@ def test_route_duplicate(app):
|
||||||
|
|
||||||
with pytest.raises(RouteExists):
|
with pytest.raises(RouteExists):
|
||||||
@app.route('/test/<dynamic>/')
|
@app.route('/test/<dynamic>/')
|
||||||
async def handler1(request, dynamic):
|
async def handler3(request, dynamic):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@app.route('/test/<dynamic>/')
|
@app.route('/test/<dynamic>/')
|
||||||
async def handler2(request, dynamic):
|
async def handler4(request, dynamic):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@ -882,12 +883,12 @@ def test_unmergeable_overload_routes(app):
|
||||||
assert response.text == 'OK1'
|
assert response.text == 'OK1'
|
||||||
|
|
||||||
@app.route('/overload_part', methods=['GET'])
|
@app.route('/overload_part', methods=['GET'])
|
||||||
async def handler1(request):
|
async def handler3(request):
|
||||||
return text('OK1')
|
return text('OK1')
|
||||||
|
|
||||||
with pytest.raises(RouteExists):
|
with pytest.raises(RouteExists):
|
||||||
@app.route('/overload_part')
|
@app.route('/overload_part')
|
||||||
async def handler2(request):
|
async def handler4(request):
|
||||||
return text('Duplicated')
|
return text('Duplicated')
|
||||||
|
|
||||||
request, response = app.test_client.get('/overload_part')
|
request, response = app.test_client.get('/overload_part')
|
||||||
|
|
|
@ -11,12 +11,15 @@ async def stop(app, loop):
|
||||||
|
|
||||||
calledq = Queue()
|
calledq = Queue()
|
||||||
|
|
||||||
|
|
||||||
def set_loop(app, loop):
|
def set_loop(app, loop):
|
||||||
loop.add_signal_handler = MagicMock()
|
loop.add_signal_handler = MagicMock()
|
||||||
|
|
||||||
|
|
||||||
def after(app, loop):
|
def after(app, loop):
|
||||||
calledq.put(loop.add_signal_handler.called)
|
calledq.put(loop.add_signal_handler.called)
|
||||||
|
|
||||||
|
|
||||||
def test_register_system_signals(app):
|
def test_register_system_signals(app):
|
||||||
"""Test if sanic register system signals"""
|
"""Test if sanic register system signals"""
|
||||||
|
|
||||||
|
@ -29,7 +32,7 @@ def test_register_system_signals(app):
|
||||||
app.listener('after_server_stop')(after)
|
app.listener('after_server_stop')(after)
|
||||||
|
|
||||||
app.run(HOST, PORT)
|
app.run(HOST, PORT)
|
||||||
assert calledq.get() == True
|
assert calledq.get() is True
|
||||||
|
|
||||||
|
|
||||||
def test_dont_register_system_signals(app):
|
def test_dont_register_system_signals(app):
|
||||||
|
@ -44,4 +47,4 @@ def test_dont_register_system_signals(app):
|
||||||
app.listener('after_server_stop')(after)
|
app.listener('after_server_stop')(after)
|
||||||
|
|
||||||
app.run(HOST, PORT, register_sys_signals=False)
|
app.run(HOST, PORT, register_sys_signals=False)
|
||||||
assert calledq.get() == False
|
assert calledq.get() is False
|
||||||
|
|
|
@ -23,7 +23,8 @@ def get_file_content(static_file_directory, file_name):
|
||||||
return file.read()
|
return file.read()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
|
@pytest.mark.parametrize('file_name',
|
||||||
|
['test.file', 'decode me.txt', 'python.png'])
|
||||||
def test_static_file(app, static_file_directory, file_name):
|
def test_static_file(app, static_file_directory, file_name):
|
||||||
app.static(
|
app.static(
|
||||||
'/testing.file', get_file_path(static_file_directory, file_name))
|
'/testing.file', get_file_path(static_file_directory, file_name))
|
||||||
|
@ -143,8 +144,8 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert 'Content-Length' in response.headers
|
assert 'Content-Length' in response.headers
|
||||||
assert 'Content-Range' not in response.headers
|
assert 'Content-Range' not in response.headers
|
||||||
assert int(response.headers[
|
assert int(response.headers['Content-Length']) == \
|
||||||
'Content-Length']) == len(get_file_content(static_file_directory, file_name))
|
len(get_file_content(static_file_directory, file_name))
|
||||||
assert response.body == bytes(
|
assert response.body == bytes(
|
||||||
get_file_content(static_file_directory, file_name))
|
get_file_content(static_file_directory, file_name))
|
||||||
|
|
||||||
|
@ -166,7 +167,8 @@ def test_static_content_range_error(app, file_name, static_file_directory):
|
||||||
len(get_file_content(static_file_directory, file_name)),)
|
len(get_file_content(static_file_directory, file_name)),)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
|
@pytest.mark.parametrize('file_name',
|
||||||
|
['test.file', 'decode me.txt', 'python.png'])
|
||||||
def test_static_file_specified_host(app, static_file_directory, file_name):
|
def test_static_file_specified_host(app, static_file_directory, file_name):
|
||||||
app.static(
|
app.static(
|
||||||
'/testing.file',
|
'/testing.file',
|
||||||
|
|
|
@ -13,12 +13,16 @@ URL_FOR_ARGS1 = dict(arg1=['v1', 'v2'])
|
||||||
URL_FOR_VALUE1 = '/myurl?arg1=v1&arg1=v2'
|
URL_FOR_VALUE1 = '/myurl?arg1=v1&arg1=v2'
|
||||||
URL_FOR_ARGS2 = dict(arg1=['v1', 'v2'], _anchor='anchor')
|
URL_FOR_ARGS2 = dict(arg1=['v1', 'v2'], _anchor='anchor')
|
||||||
URL_FOR_VALUE2 = '/myurl?arg1=v1&arg1=v2#anchor'
|
URL_FOR_VALUE2 = '/myurl?arg1=v1&arg1=v2#anchor'
|
||||||
URL_FOR_ARGS3 = dict(arg1='v1', _anchor='anchor', _scheme='http',
|
URL_FOR_ARGS3 = dict(
|
||||||
_server='{}:{}'.format(test_host, test_port), _external=True)
|
arg1='v1', _anchor='anchor', _scheme='http',
|
||||||
URL_FOR_VALUE3 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host, test_port)
|
_server='{}:{}'.format(test_host, test_port), _external=True
|
||||||
|
)
|
||||||
|
URL_FOR_VALUE3 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host,
|
||||||
|
test_port)
|
||||||
URL_FOR_ARGS4 = dict(arg1='v1', _anchor='anchor', _external=True,
|
URL_FOR_ARGS4 = dict(arg1='v1', _anchor='anchor', _external=True,
|
||||||
_server='http://{}:{}'.format(test_host, test_port))
|
_server='http://{}:{}'.format(test_host, test_port))
|
||||||
URL_FOR_VALUE4 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host, test_port)
|
URL_FOR_VALUE4 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host,
|
||||||
|
test_port)
|
||||||
|
|
||||||
|
|
||||||
def _generate_handlers_from_names(app, l):
|
def _generate_handlers_from_names(app, l):
|
||||||
|
|
|
@ -25,7 +25,8 @@ def get_file_content(static_file_directory, file_name):
|
||||||
return file.read()
|
return file.read()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
|
@pytest.mark.parametrize('file_name',
|
||||||
|
['test.file', 'decode me.txt', 'python.png'])
|
||||||
def test_static_file(app, static_file_directory, file_name):
|
def test_static_file(app, static_file_directory, file_name):
|
||||||
app.static(
|
app.static(
|
||||||
'/testing.file', get_file_path(static_file_directory, file_name))
|
'/testing.file', get_file_path(static_file_directory, file_name))
|
||||||
|
@ -364,8 +365,8 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert 'Content-Length' in response.headers
|
assert 'Content-Length' in response.headers
|
||||||
assert 'Content-Range' not in response.headers
|
assert 'Content-Range' not in response.headers
|
||||||
assert int(response.headers[
|
assert int(response.headers['Content-Length']) == \
|
||||||
'Content-Length']) == len(get_file_content(static_file_directory, file_name))
|
len(get_file_content(static_file_directory, file_name))
|
||||||
assert response.body == bytes(
|
assert response.body == bytes(
|
||||||
get_file_content(static_file_directory, file_name))
|
get_file_content(static_file_directory, file_name))
|
||||||
|
|
||||||
|
@ -384,8 +385,8 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert 'Content-Length' in response.headers
|
assert 'Content-Length' in response.headers
|
||||||
assert 'Content-Range' not in response.headers
|
assert 'Content-Range' not in response.headers
|
||||||
assert int(response.headers[
|
assert int(response.headers['Content-Length']) == \
|
||||||
'Content-Length']) == len(get_file_content(static_file_directory, file_name))
|
len(get_file_content(static_file_directory, file_name))
|
||||||
assert response.body == bytes(
|
assert response.body == bytes(
|
||||||
get_file_content(static_file_directory, file_name))
|
get_file_content(static_file_directory, file_name))
|
||||||
|
|
||||||
|
|
|
@ -4,11 +4,11 @@ from sanic.response import text
|
||||||
def test_vhosts(app):
|
def test_vhosts(app):
|
||||||
|
|
||||||
@app.route('/', host="example.com")
|
@app.route('/', host="example.com")
|
||||||
async def handler(request):
|
async def handler1(request):
|
||||||
return text("You're at example.com!")
|
return text("You're at example.com!")
|
||||||
|
|
||||||
@app.route('/', host="subdomain.example.com")
|
@app.route('/', host="subdomain.example.com")
|
||||||
async def handler(request):
|
async def handler2(request):
|
||||||
return text("You're at subdomain.example.com!")
|
return text("You're at subdomain.example.com!")
|
||||||
|
|
||||||
headers = {"Host": "example.com"}
|
headers = {"Host": "example.com"}
|
||||||
|
@ -38,11 +38,11 @@ def test_vhosts_with_list(app):
|
||||||
def test_vhosts_with_defaults(app):
|
def test_vhosts_with_defaults(app):
|
||||||
|
|
||||||
@app.route('/', host="hello.com")
|
@app.route('/', host="hello.com")
|
||||||
async def handler(request):
|
async def handler1(request):
|
||||||
return text("Hello, world!")
|
return text("Hello, world!")
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
async def handler(request):
|
async def handler2(request):
|
||||||
return text("default")
|
return text("default")
|
||||||
|
|
||||||
headers = {"Host": "hello.com"}
|
headers = {"Host": "hello.com"}
|
||||||
|
|
|
@ -129,7 +129,7 @@ def test_with_middleware_response(app):
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
@app.middleware('request')
|
@app.middleware('request')
|
||||||
async def process_response(request):
|
async def process_request(request):
|
||||||
results.append(request)
|
results.append(request)
|
||||||
|
|
||||||
@app.middleware('response')
|
@app.middleware('response')
|
||||||
|
@ -162,7 +162,8 @@ def test_with_custom_class_methods(app):
|
||||||
|
|
||||||
def get(self, request):
|
def get(self, request):
|
||||||
self._iternal_method()
|
self._iternal_method()
|
||||||
return text('I am get method and global var is {}'.format(self.global_var))
|
return text('I am get method and global var '
|
||||||
|
'is {}'.format(self.global_var))
|
||||||
|
|
||||||
app.add_route(DummyView.as_view(), '/')
|
app.add_route(DummyView.as_view(), '/')
|
||||||
request, response = app.test_client.get('/')
|
request, response = app.test_client.get('/')
|
||||||
|
|
|
@ -102,8 +102,8 @@ def test_run_max_requests_exceeded(worker):
|
||||||
|
|
||||||
assert not worker.alive
|
assert not worker.alive
|
||||||
worker.notify.assert_called_with()
|
worker.notify.assert_called_with()
|
||||||
worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
|
worker.log.info.assert_called_with("Max requests exceeded, shutting "
|
||||||
worker)
|
"down: %s", worker)
|
||||||
|
|
||||||
|
|
||||||
def test_worker_close(worker):
|
def test_worker_close(worker):
|
||||||
|
@ -125,7 +125,8 @@ def test_worker_close(worker):
|
||||||
worker.loop = loop
|
worker.loop = loop
|
||||||
server = mock.Mock()
|
server = mock.Mock()
|
||||||
server.close = mock.Mock(wraps=lambda *a, **kw: None)
|
server.close = mock.Mock(wraps=lambda *a, **kw: None)
|
||||||
server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
|
server.wait_closed = mock.Mock(wraps=asyncio.coroutine(
|
||||||
|
lambda *a, **kw: None))
|
||||||
worker.servers = {
|
worker.servers = {
|
||||||
server: {"requests_count": 14},
|
server: {"requests_count": 14},
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user