Merge branch 'master' into forbidden-exception

This commit is contained in:
François
2017-06-28 17:25:40 +02:00
committed by GitHub
11 changed files with 305 additions and 96 deletions

View File

@@ -3,8 +3,8 @@ from bs4 import BeautifulSoup
from sanic import Sanic
from sanic.response import text
from sanic.exceptions import InvalidUsage, ServerError, NotFound, Forbidden
from sanic.exceptions import abort
from sanic.exceptions import InvalidUsage, ServerError, NotFound, Unauthorized
from sanic.exceptions import Forbidden, abort
class SanicExceptionTestException(Exception):
@@ -31,6 +31,20 @@ def exception_app():
def handler_403(request):
raise Forbidden("Forbidden")
@app.route('/401/basic')
def handler_401_basic(request):
raise Unauthorized("Unauthorized", "Basic", "Sanic")
@app.route('/401/digest')
def handler_401_digest(request):
challenge = {
"qop": "auth, auth-int",
"algorithm": "MD5",
"nonce": "abcdef",
"opaque": "zyxwvu",
}
raise Unauthorized("Unauthorized", "Digest", "Sanic", challenge)
@app.route('/invalid')
def handler_invalid(request):
raise InvalidUsage("OK")
@@ -54,8 +68,10 @@ def exception_app():
return app
def test_catch_exception_list():
app = Sanic('exception_list')
@app.exception([SanicExceptionTestException, NotFound])
def exception_list(request, exception):
return text("ok")
@@ -101,6 +117,25 @@ def test_forbidden_exception(exception_app):
request, response = exception_app.test_client.get('/403')
assert response.status == 403
def test_unauthorized_exception(exception_app):
"""Test the built-in Unauthorized exception"""
request, response = exception_app.test_client.get('/401/basic')
assert response.status == 401
assert response.headers.get('WWW-Authenticate') is not None
assert response.headers.get('WWW-Authenticate') == "Basic realm='Sanic'"
request, response = exception_app.test_client.get('/401/digest')
assert response.status == 401
auth_header = response.headers.get('WWW-Authenticate')
assert auth_header is not None
assert auth_header.startswith('Digest')
assert "qop='auth, auth-int'" in auth_header
assert "algorithm='MD5'" in auth_header
assert "nonce='abcdef'" in auth_header
assert "opaque='zyxwvu'" in auth_header
def test_handled_unhandled_exception(exception_app):
"""Test that an exception not built into sanic is handled"""

View File

@@ -180,6 +180,16 @@ def test_token():
request, response = app.test_client.get('/', headers=headers)
assert request.token == token
token = 'a1d895e0-553a-421a-8e22-5ff8ecb48cbf'
headers = {
'content-type': 'application/json',
'Authorization': 'Bearer {}'.format(token)
}
request, response = app.test_client.get('/', headers=headers)
assert request.token == token
# no Authorization headers

View File

@@ -3,7 +3,11 @@ import json
import shlex
import subprocess
import urllib.request
from unittest import mock
from sanic.worker import GunicornWorker
from sanic.app import Sanic
import asyncio
import logging
import pytest
@@ -20,3 +24,79 @@ def test_gunicorn_worker(gunicorn_worker):
with urllib.request.urlopen('http://localhost:1337/') as f:
res = json.loads(f.read(100).decode())
assert res['test']
class GunicornTestWorker(GunicornWorker):
def __init__(self):
self.app = mock.Mock()
self.app.callable = Sanic("test_gunicorn_worker")
self.servers = {}
self.exit_code = 0
self.cfg = mock.Mock()
self.notify = mock.Mock()
@pytest.fixture
def worker():
return GunicornTestWorker()
def test_worker_init_process(worker):
with mock.patch('sanic.worker.asyncio') as mock_asyncio:
try:
worker.init_process()
except TypeError:
pass
assert mock_asyncio.get_event_loop.return_value.close.called
assert mock_asyncio.new_event_loop.called
assert mock_asyncio.set_event_loop.called
def test_worker_init_signals(worker):
worker.loop = mock.Mock()
worker.init_signals()
assert worker.loop.add_signal_handler.called
def test_handle_abort(worker):
with mock.patch('sanic.worker.sys') as mock_sys:
worker.handle_abort(object(), object())
assert not worker.alive
assert worker.exit_code == 1
mock_sys.exit.assert_called_with(1)
def test_handle_quit(worker):
worker.handle_quit(object(), object())
assert not worker.alive
assert worker.exit_code == 0
def test_run_max_requests_exceeded(worker):
loop = asyncio.new_event_loop()
worker.ppid = 1
worker.alive = True
sock = mock.Mock()
sock.cfg_addr = ('localhost', 8080)
worker.sockets = [sock]
worker.wsgi = mock.Mock()
worker.connections = set()
worker.log = mock.Mock()
worker.loop = loop
worker.servers = {
"server1": {"requests_count": 14},
"server2": {"requests_count": 15},
}
worker.max_requests = 10
worker._run = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
# exceeding request count
_runner = asyncio.ensure_future(worker._check_alive(), loop=loop)
loop.run_until_complete(_runner)
assert worker.alive == False
worker.notify.assert_called_with()
worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
worker)