c01cbb3a8c
This add a request timeout exception. It cancels task, when request is timeout.
41 lines
1.1 KiB
Python
41 lines
1.1 KiB
Python
from sanic import Sanic
|
|
import asyncio
|
|
from sanic.response import text
|
|
from sanic.exceptions import RequestTimeout
|
|
from sanic.utils import sanic_endpoint_test
|
|
from sanic.config import Config
|
|
|
|
Config.REQUEST_TIMEOUT = 1
|
|
request_timeout_app = Sanic('test_request_timeout')
|
|
request_timeout_default_app = Sanic('test_request_timeout_default')
|
|
|
|
|
|
@request_timeout_app.route('/1')
|
|
async def handler_1(request):
|
|
await asyncio.sleep(2)
|
|
return text('OK')
|
|
|
|
|
|
@request_timeout_app.exception(RequestTimeout)
|
|
def handler_exception(request, exception):
|
|
return text('Request Timeout from error_handler.', 408)
|
|
|
|
|
|
def test_server_error_request_timeout():
|
|
request, response = sanic_endpoint_test(request_timeout_app, uri='/1')
|
|
assert response.status == 408
|
|
assert response.text == 'Request Timeout from error_handler.'
|
|
|
|
|
|
@request_timeout_default_app.route('/1')
|
|
async def handler_2(request):
|
|
await asyncio.sleep(2)
|
|
return text('OK')
|
|
|
|
|
|
def test_default_server_error_request_timeout():
|
|
request, response = sanic_endpoint_test(
|
|
request_timeout_default_app, uri='/1')
|
|
assert response.status == 408
|
|
assert response.text == 'Error: Request Timeout'
|