sanic/sanic/server.py

322 lines
10 KiB
Python
Raw Normal View History

2016-10-15 20:59:00 +01:00
import asyncio
from functools import partial
2016-10-15 20:59:00 +01:00
from inspect import isawaitable
2016-11-20 02:41:40 +00:00
from multidict import CIMultiDict
2016-10-15 20:59:00 +01:00
from signal import SIGINT, SIGTERM
2016-10-28 11:13:03 +01:00
from time import time
from httptools import HttpRequestParser
from httptools.parser.errors import HttpParserError
from .exceptions import ServerError
2016-10-15 20:59:00 +01:00
try:
import uvloop as async_loop
2016-10-16 14:01:59 +01:00
except ImportError:
2016-10-15 20:59:00 +01:00
async_loop = asyncio
from .log import log
from .request import Request
2016-12-18 00:25:39 +00:00
from .exceptions import RequestTimeout, PayloadTooLarge, InvalidUsage
2016-10-15 20:59:00 +01:00
class Signal:
stopped = False
2016-10-28 11:35:30 +01:00
current_time = None
2016-10-15 20:59:00 +01:00
class HttpProtocol(asyncio.Protocol):
2016-10-16 14:01:59 +01:00
__slots__ = (
# event loop, connection
'loop', 'transport', 'connections', 'signal',
# request params
'parser', 'request', 'url', 'headers',
# request config
'request_handler', 'request_timeout', 'request_max_size',
# connection management
'_total_request_size', '_timeout_handler', '_last_communication_time')
2016-10-16 14:01:59 +01:00
def __init__(self, *, loop, request_handler, error_handler,
signal=Signal(), connections={}, request_timeout=60,
2016-10-15 20:59:00 +01:00
request_max_size=None):
self.loop = loop
self.transport = None
self.request = None
self.parser = None
self.url = None
self.headers = None
self.signal = signal
self.connections = connections
self.request_handler = request_handler
self.error_handler = error_handler
2016-10-15 20:59:00 +01:00
self.request_timeout = request_timeout
self.request_max_size = request_max_size
self._total_request_size = 0
self._timeout_handler = None
self._last_request_time = None
self._request_handler_task = None
2016-10-15 20:59:00 +01:00
2016-10-27 15:09:36 +01:00
# -------------------------------------------- #
2016-10-15 20:59:00 +01:00
# Connection
# -------------------------------------------- #
def connection_made(self, transport):
self.connections.add(self)
2016-10-16 14:01:59 +01:00
self._timeout_handler = self.loop.call_later(
self.request_timeout, self.connection_timeout)
2016-10-15 20:59:00 +01:00
self.transport = transport
self._last_request_time = current_time
2016-10-15 20:59:00 +01:00
def connection_lost(self, exc):
self.connections.discard(self)
2016-10-15 20:59:00 +01:00
self._timeout_handler.cancel()
self.cleanup()
def connection_timeout(self):
# Check if
time_elapsed = current_time - self._last_request_time
if time_elapsed < self.request_timeout:
time_left = self.request_timeout - time_elapsed
self._timeout_handler = \
self.loop.call_later(time_left, self.connection_timeout)
else:
2016-11-26 07:47:16 +00:00
if self._request_handler_task:
self._request_handler_task.cancel()
exception = RequestTimeout('Request Timeout')
self.write_error(exception)
2016-10-15 20:59:00 +01:00
2016-10-27 15:09:36 +01:00
# -------------------------------------------- #
2016-10-15 20:59:00 +01:00
# Parsing
# -------------------------------------------- #
def data_received(self, data):
2016-10-16 14:01:59 +01:00
# Check for the request itself getting too large and exceeding
# memory limits
2016-10-15 20:59:00 +01:00
self._total_request_size += len(data)
if self._total_request_size > self.request_max_size:
exception = PayloadTooLarge('Payload Too Large')
self.write_error(exception)
2016-10-15 20:59:00 +01:00
# Create parser if this is the first time we're receiving data
if self.parser is None:
assert self.request is None
self.headers = []
self.parser = HttpRequestParser(self)
2016-10-15 20:59:00 +01:00
# Parse request chunk or close connection
try:
self.parser.feed_data(data)
2016-12-18 00:25:39 +00:00
except HttpParserError:
exception = InvalidUsage('Bad Request')
self.write_error(exception)
2016-10-15 20:59:00 +01:00
def on_url(self, url):
self.url = url
def on_header(self, name, value):
if name == b'Content-Length' and int(value) > self.request_max_size:
exception = PayloadTooLarge('Payload Too Large')
self.write_error(exception)
2016-10-15 20:59:00 +01:00
self.headers.append((name.decode(), value.decode('utf-8')))
def on_headers_complete(self):
remote_addr = self.transport.get_extra_info('peername')
if remote_addr:
self.headers.append(('Remote-Addr', '%s:%s' % remote_addr))
2016-11-10 12:06:27 +00:00
2016-10-15 20:59:00 +01:00
self.request = Request(
url_bytes=self.url,
headers=CIMultiDict(self.headers),
2016-10-15 20:59:00 +01:00
version=self.parser.get_http_version(),
method=self.parser.get_method().decode()
)
def on_body(self, body):
2016-10-23 11:30:13 +01:00
if self.request.body:
self.request.body += body
else:
self.request.body = body
2016-10-15 20:59:00 +01:00
def on_message_complete(self):
self._request_handler_task = self.loop.create_task(
2016-10-16 14:01:59 +01:00
self.request_handler(self.request, self.write_response))
2016-10-15 20:59:00 +01:00
# -------------------------------------------- #
# Responding
# -------------------------------------------- #
def write_response(self, response):
try:
2016-10-18 07:34:07 +01:00
keep_alive = self.parser.should_keep_alive() \
and not self.signal.stopped
2016-10-16 14:01:59 +01:00
self.transport.write(
response.output(
self.request.version, keep_alive, self.request_timeout))
2016-10-15 20:59:00 +01:00
if not keep_alive:
self.transport.close()
else:
# Record that we received data
self._last_request_time = current_time
2016-10-15 20:59:00 +01:00
self.cleanup()
except Exception as e:
2016-10-16 14:01:59 +01:00
self.bail_out(
"Writing response failed, connection closed {}".format(e))
2016-10-15 20:59:00 +01:00
def write_error(self, exception):
try:
response = self.error_handler.response(self.request, exception)
version = self.request.version if self.request else '1.1'
self.transport.write(response.output(version))
self.transport.close()
except Exception as e:
self.bail_out(
"Writing error failed, connection closed {}".format(e))
2016-10-15 20:59:00 +01:00
def bail_out(self, message):
exception = ServerError(message)
self.write_error(exception)
log.error(message)
2016-10-15 20:59:00 +01:00
def cleanup(self):
self.parser = None
self.request = None
self.url = None
self.headers = None
self._request_handler_task = None
2016-10-15 20:59:00 +01:00
self._total_request_size = 0
def close_if_idle(self):
"""
Close the connection if a request is not being sent or received
:return: boolean - True if closed, false if staying open
"""
if not self.parser:
self.transport.close()
return True
return False
def update_current_time(loop):
"""
Caches the current time, since it is needed
at the end of every keep-alive request to update the request timeout time
:param loop:
:return:
"""
global current_time
2016-10-28 11:13:03 +01:00
current_time = time()
loop.call_later(1, partial(update_current_time, loop))
2016-10-15 20:59:00 +01:00
def trigger_events(events, loop):
"""
:param events: one or more sync or async functions to execute
:param loop: event loop
"""
if events:
if not isinstance(events, list):
events = [events]
for event in events:
result = event(loop)
if isawaitable(result):
loop.run_until_complete(result)
2016-12-24 02:40:07 +00:00
def serve(host, port, request_handler, error_handler, before_start=None,
after_start=None, before_stop=None, after_stop=None, debug=False,
request_timeout=60, sock=None, request_max_size=None,
2017-01-04 02:35:11 +00:00
reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100):
"""
Starts asynchronous HTTP Server on an individual process.
:param host: Address to host on
:param port: Port to host on
:param request_handler: Sanic request handler with middleware
2016-12-24 02:40:07 +00:00
:param error_handler: Sanic error handler with middleware
:param before_start: Function to be executed before the server starts
listening. Takes single argument `loop`
:param after_start: Function to be executed after the server starts
listening. Takes single argument `loop`
:param before_stop: Function to be executed when a stop signal is
received before it is respected. Takes single argumenet `loop`
2016-12-24 02:40:07 +00:00
:param after_stop: Function to be executed when a stop signal is
received after it is respected. Takes single argumenet `loop`
:param debug: Enables debug output (slows server)
:param request_timeout: time in seconds
:param sock: Socket for the server to accept connections from
:param request_max_size: size in bytes, `None` for no limit
:param reuse_port: `True` for multiple workers
:param loop: asyncio compatible event loop
2016-12-29 04:11:27 +00:00
:param protocol: Subclass of asyncio protocol class
:return: Nothing
"""
loop = loop or async_loop.new_event_loop()
2016-10-15 20:59:00 +01:00
asyncio.set_event_loop(loop)
if debug:
loop.set_debug(debug)
2016-10-15 20:59:00 +01:00
trigger_events(before_start, loop)
connections = set()
2016-10-15 20:59:00 +01:00
signal = Signal()
server = partial(
2016-12-22 15:13:38 +00:00
protocol,
2016-10-15 20:59:00 +01:00
loop=loop,
connections=connections,
signal=signal,
request_handler=request_handler,
error_handler=error_handler,
2016-10-15 20:59:00 +01:00
request_timeout=request_timeout,
request_max_size=request_max_size,
)
server_coroutine = loop.create_server(
server,
host,
port,
reuse_port=reuse_port,
2017-01-04 02:35:11 +00:00
sock=sock,
backlog=backlog
)
# Instead of pulling time at the end of every request,
# pull it once per minute
loop.call_soon(partial(update_current_time, loop))
2016-10-15 20:59:00 +01:00
try:
http_server = loop.run_until_complete(server_coroutine)
except Exception:
log.exception("Unable to start server")
2016-10-15 20:59:00 +01:00
return
trigger_events(after_start, loop)
2016-10-15 20:59:00 +01:00
# Register signals for graceful termination
for _signal in (SIGINT, SIGTERM):
loop.add_signal_handler(_signal, loop.stop)
try:
loop.run_forever()
finally:
log.info("Stop requested, draining connections...")
# Run the on_stop function if provided
trigger_events(before_stop, loop)
2016-10-15 20:59:00 +01:00
# Wait for event loop to finish and all connections to drain
http_server.close()
loop.run_until_complete(http_server.wait_closed())
# Complete all tasks on the loop
signal.stopped = True
for connection in connections:
2016-10-15 20:59:00 +01:00
connection.close_if_idle()
while connections:
loop.run_until_complete(asyncio.sleep(0.1))
trigger_events(after_stop, loop)
2016-10-15 20:59:00 +01:00
loop.close()