WIP - Split RequestTimeout, ResponseTimout, and KeepAliveTimeout into different timeouts, with different callbacks.

This commit is contained in:
Ashley Sommer 2017-09-11 17:17:33 +10:00
parent 53a5bd2319
commit 2979e03148
7 changed files with 395 additions and 46 deletions

View File

@ -745,6 +745,8 @@ class Sanic:
'request_handler': self.handle_request, 'request_handler': self.handle_request,
'error_handler': self.error_handler, 'error_handler': self.error_handler,
'request_timeout': self.config.REQUEST_TIMEOUT, 'request_timeout': self.config.REQUEST_TIMEOUT,
'response_timeout': self.config.RESPONSE_TIMEOUT,
'keep_alive_timeout': self.config.KEEP_ALIVE_TIMEOUT,
'request_max_size': self.config.REQUEST_MAX_SIZE, 'request_max_size': self.config.REQUEST_MAX_SIZE,
'keep_alive': self.config.KEEP_ALIVE, 'keep_alive': self.config.KEEP_ALIVE,
'loop': loop, 'loop': loop,

View File

@ -125,7 +125,15 @@ class Config(dict):
""" """
self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes self.REQUEST_MAX_SIZE = 100000000 # 100 megabytes
self.REQUEST_TIMEOUT = 60 # 60 seconds self.REQUEST_TIMEOUT = 60 # 60 seconds
self.RESPONSE_TIMEOUT = 60 # 60 seconds
self.KEEP_ALIVE = keep_alive self.KEEP_ALIVE = keep_alive
# Apache httpd server default keepalive timeout = 5 seconds
# Nginx server default keepalive timeout = 75 seconds
# Nginx performance tuning guidelines uses keepalive timeout = 15 seconds
# IE client hard keepalive limit = 60 seconds
# Firefox client hard keepalive limit = 115 seconds
self.KEEP_ALIVE_TIMEOUT = 5 # 5 seconds
self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes
self.WEBSOCKET_MAX_QUEUE = 32 self.WEBSOCKET_MAX_QUEUE = 32
self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec

View File

@ -155,6 +155,13 @@ class ServerError(SanicException):
pass pass
@add_status_code(503)
class ServiceUnavailable(SanicException):
"""The server is currently unavailable (because it is overloaded or
down for maintenance). Generally, this is a temporary state."""
pass
class URLBuildError(ServerError): class URLBuildError(ServerError):
pass pass
@ -170,6 +177,13 @@ class FileNotFound(NotFound):
@add_status_code(408) @add_status_code(408)
class RequestTimeout(SanicException): class RequestTimeout(SanicException):
"""The Web server (running the Web site) thinks that there has been too
long an interval of time between 1) the establishment of an IP
connection (socket) between the client and the server and
2) the receipt of any data on that socket, so the server has dropped
the connection. The socket connection has actually been lost - the Web
server has 'timed out' on that particular socket connection.
"""
pass pass

View File

@ -28,7 +28,8 @@ from sanic.log import log, netlog
from sanic.response import HTTPResponse from sanic.response import HTTPResponse
from sanic.request import Request from sanic.request import Request
from sanic.exceptions import ( from sanic.exceptions import (
RequestTimeout, PayloadTooLarge, InvalidUsage, ServerError) RequestTimeout, PayloadTooLarge, InvalidUsage, ServerError,
ServiceUnavailable)
current_time = None current_time = None
@ -63,16 +64,19 @@ class HttpProtocol(asyncio.Protocol):
# request params # request params
'parser', 'request', 'url', 'headers', 'parser', 'request', 'url', 'headers',
# request config # request config
'request_handler', 'request_timeout', 'request_max_size', 'request_handler', 'request_timeout', 'response_timeout',
'request_class', 'is_request_stream', 'router', 'keep_alive_timeout', 'request_max_size', 'request_class',
'is_request_stream', 'router',
# enable or disable access log / error log purpose # enable or disable access log / error log purpose
'has_log', 'has_log',
# connection management # connection management
'_total_request_size', '_timeout_handler', '_last_communication_time', '_total_request_size', '_request_timeout_handler',
'_is_stream_handler') '_response_timeout_handler', '_keep_alive_timeout_handler',
'_last_request_time', '_last_response_time', '_is_stream_handler')
def __init__(self, *, loop, request_handler, error_handler, def __init__(self, *, loop, request_handler, error_handler,
signal=Signal(), connections=set(), request_timeout=60, signal=Signal(), connections=set(), request_timeout=60,
response_timeout=60, keep_alive_timeout=15,
request_max_size=None, request_class=None, has_log=True, request_max_size=None, request_class=None, has_log=True,
keep_alive=True, is_request_stream=False, router=None, keep_alive=True, is_request_stream=False, router=None,
state=None, debug=False, **kwargs): state=None, debug=False, **kwargs):
@ -89,13 +93,18 @@ class HttpProtocol(asyncio.Protocol):
self.request_handler = request_handler self.request_handler = request_handler
self.error_handler = error_handler self.error_handler = error_handler
self.request_timeout = request_timeout self.request_timeout = request_timeout
self.response_timeout = response_timeout
self.keep_alive_timeout = keep_alive_timeout
self.request_max_size = request_max_size self.request_max_size = request_max_size
self.request_class = request_class or Request self.request_class = request_class or Request
self.is_request_stream = is_request_stream self.is_request_stream = is_request_stream
self._is_stream_handler = False self._is_stream_handler = False
self._total_request_size = 0 self._total_request_size = 0
self._timeout_handler = None self._request_timeout_handler = None
self._response_timeout_handler = None
self._keep_alive_timeout_handler = None
self._last_request_time = None self._last_request_time = None
self._last_response_time = None
self._request_handler_task = None self._request_handler_task = None
self._request_stream_task = None self._request_stream_task = None
self._keep_alive = keep_alive self._keep_alive = keep_alive
@ -118,22 +127,32 @@ class HttpProtocol(asyncio.Protocol):
def connection_made(self, transport): def connection_made(self, transport):
self.connections.add(self) self.connections.add(self)
self._timeout_handler = self.loop.call_later( self._request_timeout_handler = self.loop.call_later(
self.request_timeout, self.connection_timeout) self.request_timeout, self.request_timeout_callback)
self.transport = transport self.transport = transport
self._last_request_time = current_time self._last_request_time = current_time
def connection_lost(self, exc): def connection_lost(self, exc):
self.connections.discard(self) self.connections.discard(self)
self._timeout_handler.cancel() if self._request_timeout_handler:
self._request_timeout_handler.cancel()
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
if self._keep_alive_timeout_handler:
self._keep_alive_timeout_handler.cancel()
def connection_timeout(self): def request_timeout_callback(self):
# Check if # See the docstring in the RequestTimeout exception, to see
# exactly what this timeout is checking for.
# Check if elapsed time since request initiated exceeds our
# configured maximum request timeout value
time_elapsed = current_time - self._last_request_time time_elapsed = current_time - self._last_request_time
if time_elapsed < self.request_timeout: if time_elapsed < self.request_timeout:
time_left = self.request_timeout - time_elapsed time_left = self.request_timeout - time_elapsed
self._timeout_handler = ( self._request_timeout_handler = (
self.loop.call_later(time_left, self.connection_timeout)) self.loop.call_later(time_left,
self.request_timeout_callback)
)
else: else:
if self._request_stream_task: if self._request_stream_task:
self._request_stream_task.cancel() self._request_stream_task.cancel()
@ -144,6 +163,37 @@ class HttpProtocol(asyncio.Protocol):
except RequestTimeout as exception: except RequestTimeout as exception:
self.write_error(exception) self.write_error(exception)
def response_timeout_callback(self):
# Check if elapsed time since response was initiated exceeds our
# configured maximum request timeout value
time_elapsed = current_time - self._last_request_time
if time_elapsed < self.response_timeout:
time_left = self.response_timeout - time_elapsed
self._response_timeout_handler = (
self.loop.call_later(time_left,
self.response_timeout_callback)
)
else:
try:
raise ServiceUnavailable('Response Timeout')
except ServiceUnavailable as exception:
self.write_error(exception)
def keep_alive_timeout_callback(self):
# Check if elapsed time since last response exceeds our configured
# maximum keep alive timeout value
time_elapsed = current_time - self._last_response_time
if time_elapsed < self.keep_alive_timeout:
time_left = self.keep_alive_timeout - time_elapsed
self._keep_alive_timeout_handler = (
self.loop.call_later(time_left,
self.keep_alive_timeout_callback)
)
else:
log.info('KeepAlive Timeout. Closing connection.')
self.transport.close()
# -------------------------------------------- # # -------------------------------------------- #
# Parsing # Parsing
# -------------------------------------------- # # -------------------------------------------- #
@ -204,6 +254,11 @@ class HttpProtocol(asyncio.Protocol):
method=self.parser.get_method().decode(), method=self.parser.get_method().decode(),
transport=self.transport transport=self.transport
) )
# Remove any existing KeepAlive handler here,
# It will be recreated if required on the new request.
if self._keep_alive_timeout_handler:
self._keep_alive_timeout_handler.cancel()
self._keep_alive_timeout_handler = None
if self.is_request_stream: if self.is_request_stream:
self._is_stream_handler = self.router.is_stream_handler( self._is_stream_handler = self.router.is_stream_handler(
self.request) self.request)
@ -219,6 +274,11 @@ class HttpProtocol(asyncio.Protocol):
self.request.body.append(body) self.request.body.append(body)
def on_message_complete(self): def on_message_complete(self):
# Entire request (headers and whole body) is received.
# We can cancel and remove the request timeout handler now.
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
self._request_timeout_handler = None
if self.is_request_stream and self._is_stream_handler: if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task( self._request_stream_task = self.loop.create_task(
self.request.stream.put(None)) self.request.stream.put(None))
@ -227,6 +287,9 @@ class HttpProtocol(asyncio.Protocol):
self.execute_request_handler() self.execute_request_handler()
def execute_request_handler(self): def execute_request_handler(self):
self._response_timeout_handler = self.loop.call_later(
self.response_timeout, self.response_timeout_callback)
self._last_request_time = current_time
self._request_handler_task = self.loop.create_task( self._request_handler_task = self.loop.create_task(
self.request_handler( self.request_handler(
self.request, self.request,
@ -240,12 +303,15 @@ class HttpProtocol(asyncio.Protocol):
""" """
Writes response content synchronously to the transport. Writes response content synchronously to the transport.
""" """
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
try: try:
keep_alive = self.keep_alive keep_alive = self.keep_alive
self.transport.write( self.transport.write(
response.output( response.output(
self.request.version, keep_alive, self.request.version, keep_alive,
self.request_timeout)) self.keep_alive_timeout))
if self.has_log: if self.has_log:
netlog.info('', extra={ netlog.info('', extra={
'status': response.status, 'status': response.status,
@ -273,7 +339,10 @@ class HttpProtocol(asyncio.Protocol):
if not keep_alive: if not keep_alive:
self.transport.close() self.transport.close()
else: else:
self._last_request_time = current_time self._keep_alive_timeout_handler = self.loop.call_later(
self.keep_alive_timeout,
self.keep_alive_timeout_callback)
self._last_response_time = current_time
self.cleanup() self.cleanup()
async def stream_response(self, response): async def stream_response(self, response):
@ -282,12 +351,14 @@ class HttpProtocol(asyncio.Protocol):
the transport to the response so the response consumer can the transport to the response so the response consumer can
write to the response as needed. write to the response as needed.
""" """
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
try: try:
keep_alive = self.keep_alive keep_alive = self.keep_alive
response.transport = self.transport response.transport = self.transport
await response.stream( await response.stream(
self.request.version, keep_alive, self.request_timeout) self.request.version, keep_alive, self.keep_alive_timeout)
if self.has_log: if self.has_log:
netlog.info('', extra={ netlog.info('', extra={
'status': response.status, 'status': response.status,
@ -315,10 +386,18 @@ class HttpProtocol(asyncio.Protocol):
if not keep_alive: if not keep_alive:
self.transport.close() self.transport.close()
else: else:
self._last_request_time = current_time self._keep_alive_timeout_handler = self.loop.call_later(
self.keep_alive_timeout,
self.keep_alive_timeout_callback)
self._last_response_time = current_time
self.cleanup() self.cleanup()
def write_error(self, exception): def write_error(self, exception):
# An error _is_ a response.
# Don't throw a response timeout, when a response _is_ given.
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
response = None response = None
try: try:
response = self.error_handler.response(self.request, exception) response = self.error_handler.response(self.request, exception)
@ -330,8 +409,9 @@ class HttpProtocol(asyncio.Protocol):
self.request.ip if self.request else 'Unknown')) self.request.ip if self.request else 'Unknown'))
except Exception as e: except Exception as e:
self.bail_out( self.bail_out(
"Writing error failed, connection closed {}".format(repr(e)), "Writing error failed, connection closed {}".format(
from_error=True) repr(e)), from_error=True
)
finally: finally:
if self.has_log: if self.has_log:
extra = dict() extra = dict()
@ -367,6 +447,9 @@ class HttpProtocol(asyncio.Protocol):
log.error(message) log.error(message)
def cleanup(self): def cleanup(self):
"""This is called when KeepAlive feature is used,
it resets the connection in order for it to be able
to handle receiving another request on the same connection."""
self.parser = None self.parser = None
self.request = None self.request = None
self.url = None self.url = None
@ -421,12 +504,13 @@ def trigger_events(events, loop):
def serve(host, port, request_handler, error_handler, before_start=None, def serve(host, port, request_handler, error_handler, before_start=None,
after_start=None, before_stop=None, after_stop=None, debug=False, after_start=None, before_stop=None, after_stop=None, debug=False,
request_timeout=60, ssl=None, sock=None, request_max_size=None, request_timeout=60, response_timeout=60, keep_alive_timeout=60,
reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, ssl=None, sock=None, request_max_size=None, reuse_port=False,
loop=None, protocol=HttpProtocol, backlog=100,
register_sys_signals=True, run_async=False, connections=None, register_sys_signals=True, run_async=False, connections=None,
signal=Signal(), request_class=None, has_log=True, keep_alive=True, signal=Signal(), request_class=None, has_log=True,
is_request_stream=False, router=None, websocket_max_size=None, keep_alive=True, is_request_stream=False, router=None,
websocket_max_queue=None, state=None, websocket_max_size=None, websocket_max_queue=None, state=None,
graceful_shutdown_timeout=15.0): graceful_shutdown_timeout=15.0):
"""Start asynchronous HTTP Server on an individual process. """Start asynchronous HTTP Server on an individual process.
@ -474,6 +558,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
request_handler=request_handler, request_handler=request_handler,
error_handler=error_handler, error_handler=error_handler,
request_timeout=request_timeout, request_timeout=request_timeout,
response_timeout=response_timeout,
keep_alive_timeout=keep_alive_timeout,
request_max_size=request_max_size, request_max_size=request_max_size,
request_class=request_class, request_class=request_class,
has_log=has_log, has_log=has_log,

View File

@ -0,0 +1,142 @@
from json import JSONDecodeError
from sanic import Sanic
from time import sleep as sync_sleep
import asyncio
from sanic.response import text
from sanic.config import Config
import aiohttp
from aiohttp import TCPConnector
from sanic.testing import SanicTestClient, HOST, PORT
class ReuseableTCPConnector(TCPConnector):
def __init__(self, *args, **kwargs):
super(ReuseableTCPConnector, self).__init__(*args, **kwargs)
self.conn = None
@asyncio.coroutine
def connect(self, req):
if self.conn:
return self.conn
conn = yield from super(ReuseableTCPConnector, self).connect(req)
self.conn = conn
return conn
def close(self):
return super(ReuseableTCPConnector, self).close()
class ReuseableSanicTestClient(SanicTestClient):
def __init__(self, app):
super(ReuseableSanicTestClient, self).__init__(app)
self._tcp_connector = None
self._session = None
def _sanic_endpoint_test(
self, method='get', uri='/', gather_request=True,
debug=False, server_kwargs={},
*request_args, **request_kwargs):
results = [None, None]
exceptions = []
if gather_request:
def _collect_request(request):
if results[0] is None:
results[0] = request
self.app.request_middleware.appendleft(_collect_request)
@self.app.listener('after_server_start')
async def _collect_response(sanic, loop):
try:
response = await self._local_request(
method, uri, *request_args,
**request_kwargs)
results[-1] = response
except Exception as e:
log.error(
'Exception:\n{}'.format(traceback.format_exc()))
exceptions.append(e)
self.app.stop()
server = self.app.create_server(host=HOST, debug=debug, port=PORT, **server_kwargs)
self.app.listeners['after_server_start'].pop()
if exceptions:
raise ValueError(
"Exception during request: {}".format(exceptions))
if gather_request:
try:
request, response = results
return request, response
except:
raise ValueError(
"Request and response object expected, got ({})".format(
results))
else:
try:
return results[-1]
except:
raise ValueError(
"Request object expected, got ({})".format(results))
async def _local_request(self, method, uri, cookies=None, *args,
**kwargs):
if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')):
url = uri
else:
url = 'http://{host}:{port}{uri}'.format(
host=HOST, port=PORT, uri=uri)
if self._session:
session = self._session
else:
if self._tcp_connector:
conn = self._tcp_connector
else:
conn = ReuseableTCPConnector(verify_ssl=False)
self._tcp_connector = conn
session = aiohttp.ClientSession(cookies=cookies,
connector=conn)
self._session = session
async with getattr(session, method.lower())(
url, *args, **kwargs) as response:
try:
response.text = await response.text()
except UnicodeDecodeError:
response.text = None
try:
response.json = await response.json()
except (JSONDecodeError,
UnicodeDecodeError,
aiohttp.ClientResponseError):
response.json = None
response.body = await response.read()
return response
Config.KEEP_ALIVE_TIMEOUT = 30
Config.KEEP_ALIVE = True
keep_alive_timeout_app = Sanic('test_request_timeout')
@keep_alive_timeout_app.route('/1')
async def handler(request):
return text('OK')
def test_keep_alive_timeout():
client = ReuseableSanicTestClient(keep_alive_timeout_app)
headers = {
'Connection': 'keep-alive'
}
request, response = client.get('/1', headers=headers)
assert response.status == 200
#sync_sleep(2)
request, response = client.get('/1')
assert response.status == 200

View File

@ -1,38 +1,97 @@
from json import JSONDecodeError
from sanic import Sanic from sanic import Sanic
import asyncio import asyncio
from sanic.response import text from sanic.response import text
from sanic.exceptions import RequestTimeout from sanic.exceptions import RequestTimeout
from sanic.config import Config from sanic.config import Config
import aiohttp
from aiohttp import TCPConnector
from sanic.testing import SanicTestClient, HOST, PORT
class DelayableTCPConnector(TCPConnector):
class DelayableHttpRequest(object):
def __new__(cls, req, delay):
cls = super(DelayableTCPConnector.DelayableHttpRequest, cls).\
__new__(cls)
cls.req = req
cls.delay = delay
return cls
def __getattr__(self, item):
return getattr(self.req, item)
def send(self, *args, **kwargs):
if self.delay and self.delay > 0:
_ = yield from asyncio.sleep(self.delay)
self.req.send(*args, **kwargs)
def __init__(self, *args, **kwargs):
_post_connect_delay = kwargs.pop('post_connect_delay', 0)
_pre_request_delay = kwargs.pop('pre_request_delay', 0)
super(DelayableTCPConnector, self).__init__(*args, **kwargs)
self._post_connect_delay = _post_connect_delay
self._pre_request_delay = _pre_request_delay
@asyncio.coroutine
def connect(self, req):
req = DelayableTCPConnector.\
DelayableHttpRequest(req, self._pre_request_delay)
conn = yield from super(DelayableTCPConnector, self).connect(req)
if self._post_connect_delay and self._post_connect_delay > 0:
_ = yield from asyncio.sleep(self._post_connect_delay)
return conn
class DelayableSanicTestClient(SanicTestClient):
def __init__(self, app, request_delay=1):
super(DelayableSanicTestClient, self).__init__(app)
self._request_delay = request_delay
async def _local_request(self, method, uri, cookies=None, *args,
**kwargs):
if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')):
url = uri
else:
url = 'http://{host}:{port}{uri}'.format(
host=HOST, port=PORT, uri=uri)
conn = DelayableTCPConnector(pre_request_delay=self._request_delay,
verify_ssl=False)
async with aiohttp.ClientSession(
cookies=cookies, connector=conn) as session:
# Insert a delay after creating the connection
# But before sending the request.
async with getattr(session, method.lower())(
url, *args, **kwargs) as response:
try:
response.text = await response.text()
except UnicodeDecodeError:
response.text = None
try:
response.json = await response.json()
except (JSONDecodeError,
UnicodeDecodeError,
aiohttp.ClientResponseError):
response.json = None
response.body = await response.read()
return response
Config.REQUEST_TIMEOUT = 1 Config.REQUEST_TIMEOUT = 1
request_timeout_app = Sanic('test_request_timeout')
request_timeout_default_app = Sanic('test_request_timeout_default') request_timeout_default_app = Sanic('test_request_timeout_default')
@request_timeout_app.route('/1')
async def handler_1(request):
await asyncio.sleep(2)
return text('OK')
@request_timeout_app.exception(RequestTimeout)
def handler_exception(request, exception):
return text('Request Timeout from error_handler.', 408)
def test_server_error_request_timeout():
request, response = request_timeout_app.test_client.get('/1')
assert response.status == 408
assert response.text == 'Request Timeout from error_handler.'
@request_timeout_default_app.route('/1') @request_timeout_default_app.route('/1')
async def handler_2(request): async def handler(request):
await asyncio.sleep(2)
return text('OK') return text('OK')
def test_default_server_error_request_timeout(): def test_default_server_error_request_timeout():
request, response = request_timeout_default_app.test_client.get('/1') client = DelayableSanicTestClient(request_timeout_default_app, 2)
request, response = client.get('/1')
assert response.status == 408 assert response.status == 408
assert response.text == 'Error: Request Timeout' assert response.text == 'Error: Request Timeout'

View File

@ -0,0 +1,38 @@
from sanic import Sanic
import asyncio
from sanic.response import text
from sanic.exceptions import ServiceUnavailable
from sanic.config import Config
Config.RESPONSE_TIMEOUT = 1
response_timeout_app = Sanic('test_response_timeout')
response_timeout_default_app = Sanic('test_response_timeout_default')
@response_timeout_app.route('/1')
async def handler_1(request):
await asyncio.sleep(2)
return text('OK')
@response_timeout_app.exception(ServiceUnavailable)
def handler_exception(request, exception):
return text('Response Timeout from error_handler.', 503)
def test_server_error_response_timeout():
request, response = response_timeout_app.test_client.get('/1')
assert response.status == 503
assert response.text == 'Response Timeout from error_handler.'
@response_timeout_default_app.route('/1')
async def handler_2(request):
await asyncio.sleep(2)
return text('OK')
def test_default_server_error_response_timeout():
request, response = response_timeout_default_app.test_client.get('/1')
assert response.status == 503
assert response.text == 'Error: Response Timeout'