sanic/sanic/server.py

870 lines
29 KiB
Python
Raw Normal View History

2016-10-15 20:59:00 +01:00
import asyncio
import os
import traceback
2018-10-18 05:20:16 +01:00
from functools import partial
2016-10-15 20:59:00 +01:00
from inspect import isawaitable
2017-02-27 00:31:39 +00:00
from multiprocessing import Process
2018-10-18 05:20:16 +01:00
from signal import SIG_IGN, SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from socket import SO_REUSEADDR, SOL_SOCKET, socket
2016-10-28 11:13:03 +01:00
from time import time
from httptools import HttpRequestParser
from httptools.parser.errors import HttpParserError
from multidict import CIMultiDict
2018-10-18 05:20:16 +01:00
from sanic.exceptions import (
InvalidUsage,
PayloadTooLarge,
RequestTimeout,
ServerError,
ServiceUnavailable,
)
from sanic.log import access_logger, logger
from sanic.request import Request, StreamBuffer
2018-10-18 05:20:16 +01:00
from sanic.response import HTTPResponse
2016-10-15 20:59:00 +01:00
try:
2018-01-15 19:23:49 +00:00
import uvloop
2018-10-14 01:55:33 +01:00
2018-01-15 19:23:49 +00:00
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
2016-10-16 14:01:59 +01:00
except ImportError:
2018-01-15 19:23:49 +00:00
pass
2016-10-15 20:59:00 +01:00
2017-01-17 00:12:42 +00:00
current_time = None
2016-10-15 20:59:00 +01:00
class Signal:
stopped = False
class HttpProtocol(asyncio.Protocol):
"""
This class provides a basic HTTP implementation of the sanic framework.
"""
2016-10-16 14:01:59 +01:00
__slots__ = (
# event loop, connection
2018-10-14 01:55:33 +01:00
"loop",
"transport",
"connections",
"signal",
2016-10-16 14:01:59 +01:00
# request params
2018-10-14 01:55:33 +01:00
"parser",
"request",
"url",
"headers",
2016-10-16 14:01:59 +01:00
# request config
2018-10-14 01:55:33 +01:00
"request_handler",
"request_timeout",
"response_timeout",
"keep_alive_timeout",
"request_max_size",
"request_buffer_queue_size",
2018-10-14 01:55:33 +01:00
"request_class",
"is_request_stream",
"router",
"error_handler",
2017-09-11 02:38:52 +01:00
# enable or disable access log purpose
2018-10-14 01:55:33 +01:00
"access_log",
2016-10-16 14:01:59 +01:00
# connection management
2018-10-14 01:55:33 +01:00
"_total_request_size",
"_request_timeout_handler",
"_response_timeout_handler",
"_keep_alive_timeout_handler",
"_last_request_time",
"_last_response_time",
"_is_stream_handler",
"_not_paused",
"_request_handler_task",
"_request_stream_task",
"_keep_alive",
"_header_fragment",
"state",
"_debug",
2018-10-14 01:55:33 +01:00
)
def __init__(
self,
*,
loop,
request_handler,
error_handler,
signal=Signal(),
connections=None,
2018-10-14 01:55:33 +01:00
request_timeout=60,
response_timeout=60,
keep_alive_timeout=5,
request_max_size=None,
request_buffer_queue_size=100,
2018-10-14 01:55:33 +01:00
request_class=None,
access_log=True,
keep_alive=True,
is_request_stream=False,
router=None,
state=None,
debug=False,
**kwargs
):
2016-10-15 20:59:00 +01:00
self.loop = loop
self.transport = None
self.request = None
self.parser = None
self.url = None
self.headers = None
2017-05-05 12:09:32 +01:00
self.router = router
2016-10-15 20:59:00 +01:00
self.signal = signal
2017-09-11 02:38:52 +01:00
self.access_log = access_log
self.connections = connections or set()
2016-10-15 20:59:00 +01:00
self.request_handler = request_handler
self.error_handler = error_handler
2016-10-15 20:59:00 +01:00
self.request_timeout = request_timeout
self.request_buffer_queue_size = request_buffer_queue_size
self.response_timeout = response_timeout
self.keep_alive_timeout = keep_alive_timeout
2016-10-15 20:59:00 +01:00
self.request_max_size = request_max_size
self.request_class = request_class or Request
2017-05-05 12:09:32 +01:00
self.is_request_stream = is_request_stream
self._is_stream_handler = False
self._not_paused = asyncio.Event(loop=loop)
2016-10-15 20:59:00 +01:00
self._total_request_size = 0
self._request_timeout_handler = None
self._response_timeout_handler = None
self._keep_alive_timeout_handler = None
self._last_request_time = None
self._last_response_time = None
self._request_handler_task = None
2017-05-05 12:09:32 +01:00
self._request_stream_task = None
2017-04-17 06:43:49 +01:00
self._keep_alive = keep_alive
2018-10-14 01:55:33 +01:00
self._header_fragment = b""
self.state = state if state else {}
2018-10-14 01:55:33 +01:00
if "requests_count" not in self.state:
self.state["requests_count"] = 0
2017-06-26 13:05:23 +01:00
self._debug = debug
self._not_paused.set()
2016-10-15 20:59:00 +01:00
2017-04-12 09:55:22 +01:00
@property
def keep_alive(self):
"""
Check if the connection needs to be kept alive based on the params
attached to the `_keep_alive` attribute, :attr:`Signal.stopped`
and :func:`HttpProtocol.parser.should_keep_alive`
:return: ``True`` if connection is to be kept alive ``False`` else
"""
2017-06-26 13:05:23 +01:00
return (
2018-10-14 01:55:33 +01:00
self._keep_alive
and not self.signal.stopped
and self.parser.should_keep_alive()
)
2017-04-12 09:55:22 +01:00
2016-10-27 15:09:36 +01:00
# -------------------------------------------- #
2016-10-15 20:59:00 +01:00
# Connection
# -------------------------------------------- #
def connection_made(self, transport):
self.connections.add(self)
self._request_timeout_handler = self.loop.call_later(
2018-10-14 01:55:33 +01:00
self.request_timeout, self.request_timeout_callback
)
2016-10-15 20:59:00 +01:00
self.transport = transport
self._last_request_time = current_time
2016-10-15 20:59:00 +01:00
def connection_lost(self, exc):
self.connections.discard(self)
2018-10-12 06:38:26 +01:00
if self._request_handler_task:
self._request_handler_task.cancel()
if self._request_stream_task:
self._request_stream_task.cancel()
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
if self._keep_alive_timeout_handler:
self._keep_alive_timeout_handler.cancel()
def pause_writing(self):
self._not_paused.clear()
def resume_writing(self):
self._not_paused.set()
def request_timeout_callback(self):
# See the docstring in the RequestTimeout exception, to see
# exactly what this timeout is checking for.
# Check if elapsed time since request initiated exceeds our
# configured maximum request timeout value
time_elapsed = current_time - self._last_request_time
if time_elapsed < self.request_timeout:
time_left = self.request_timeout - time_elapsed
2018-10-14 01:55:33 +01:00
self._request_timeout_handler = self.loop.call_later(
time_left, self.request_timeout_callback
)
else:
2017-05-05 12:09:32 +01:00
if self._request_stream_task:
self._request_stream_task.cancel()
2016-11-26 07:47:16 +00:00
if self._request_handler_task:
self._request_handler_task.cancel()
2018-10-14 01:55:33 +01:00
self.write_error(RequestTimeout("Request Timeout"))
2016-10-15 20:59:00 +01:00
def response_timeout_callback(self):
# Check if elapsed time since response was initiated exceeds our
# configured maximum request timeout value
time_elapsed = current_time - self._last_request_time
if time_elapsed < self.response_timeout:
time_left = self.response_timeout - time_elapsed
2018-10-14 01:55:33 +01:00
self._response_timeout_handler = self.loop.call_later(
time_left, self.response_timeout_callback
)
else:
if self._request_stream_task:
self._request_stream_task.cancel()
if self._request_handler_task:
self._request_handler_task.cancel()
2018-10-14 01:55:33 +01:00
self.write_error(ServiceUnavailable("Response Timeout"))
def keep_alive_timeout_callback(self):
"""
Check if elapsed time since last response exceeds our configured
maximum keep alive timeout value and if so, close the transport
pipe and let the response writer handle the error.
:return: None
"""
time_elapsed = current_time - self._last_response_time
if time_elapsed < self.keep_alive_timeout:
time_left = self.keep_alive_timeout - time_elapsed
2018-10-14 01:55:33 +01:00
self._keep_alive_timeout_handler = self.loop.call_later(
time_left, self.keep_alive_timeout_callback
)
else:
2018-10-14 01:55:33 +01:00
logger.debug("KeepAlive Timeout. Closing connection.")
self.transport.close()
self.transport = None
2016-10-27 15:09:36 +01:00
# -------------------------------------------- #
2016-10-15 20:59:00 +01:00
# Parsing
# -------------------------------------------- #
def data_received(self, data):
2016-10-16 14:01:59 +01:00
# Check for the request itself getting too large and exceeding
# memory limits
2016-10-15 20:59:00 +01:00
self._total_request_size += len(data)
if self._total_request_size > self.request_max_size:
2018-10-14 01:55:33 +01:00
self.write_error(PayloadTooLarge("Payload Too Large"))
2016-10-15 20:59:00 +01:00
# Create parser if this is the first time we're receiving data
if self.parser is None:
assert self.request is None
self.headers = []
self.parser = HttpRequestParser(self)
2016-10-15 20:59:00 +01:00
# requests count
2018-10-14 01:55:33 +01:00
self.state["requests_count"] = self.state["requests_count"] + 1
2016-10-15 20:59:00 +01:00
# Parse request chunk or close connection
try:
self.parser.feed_data(data)
2016-12-18 00:25:39 +00:00
except HttpParserError:
2018-10-14 01:55:33 +01:00
message = "Bad Request"
2017-06-26 13:05:23 +01:00
if self._debug:
2018-10-14 01:55:33 +01:00
message += "\n" + traceback.format_exc()
2018-10-03 01:59:24 +01:00
self.write_error(InvalidUsage(message))
2016-10-15 20:59:00 +01:00
def on_url(self, url):
2017-05-30 15:13:49 +01:00
if not self.url:
self.url = url
else:
self.url += url
2016-10-15 20:59:00 +01:00
def on_header(self, name, value):
2017-05-27 15:28:57 +01:00
self._header_fragment += name
if value is not None:
2018-10-14 01:55:33 +01:00
if (
self._header_fragment == b"Content-Length"
and int(value) > self.request_max_size
):
self.write_error(PayloadTooLarge("Payload Too Large"))
try:
value = value.decode()
except UnicodeDecodeError:
2018-10-14 01:55:33 +01:00
value = value.decode("latin_1")
2017-05-28 17:46:07 +01:00
self.headers.append(
2018-10-14 01:55:33 +01:00
(self._header_fragment.decode().casefold(), value)
)
2017-05-27 15:28:57 +01:00
2018-10-14 01:55:33 +01:00
self._header_fragment = b""
2016-10-15 20:59:00 +01:00
def on_headers_complete(self):
self.request = self.request_class(
2016-10-15 20:59:00 +01:00
url_bytes=self.url,
headers=CIMultiDict(self.headers),
2016-10-15 20:59:00 +01:00
version=self.parser.get_http_version(),
method=self.parser.get_method().decode(),
2018-10-14 01:55:33 +01:00
transport=self.transport,
2016-10-15 20:59:00 +01:00
)
# Remove any existing KeepAlive handler here,
# It will be recreated if required on the new request.
if self._keep_alive_timeout_handler:
self._keep_alive_timeout_handler.cancel()
self._keep_alive_timeout_handler = None
2017-05-05 12:09:32 +01:00
if self.is_request_stream:
self._is_stream_handler = self.router.is_stream_handler(
2018-10-14 01:55:33 +01:00
self.request
)
2017-05-05 12:09:32 +01:00
if self._is_stream_handler:
2018-12-04 07:16:45 +00:00
self.request.stream = StreamBuffer(
self.request_buffer_queue_size
2018-12-04 07:16:45 +00:00
)
2017-05-05 12:09:32 +01:00
self.execute_request_handler()
2016-10-15 20:59:00 +01:00
def on_body(self, body):
2017-05-05 12:09:32 +01:00
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
self.body_append(body)
2018-10-14 01:55:33 +01:00
)
else:
self.request.body_push(body)
2016-10-15 20:59:00 +01:00
async def body_append(self, body):
if self.request.stream.is_full():
self.transport.pause_reading()
await self.request.stream.put(body)
self.transport.resume_reading()
else:
await self.request.stream.put(body)
2016-10-15 20:59:00 +01:00
def on_message_complete(self):
# Entire request (headers and whole body) is received.
# We can cancel and remove the request timeout handler now.
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
self._request_timeout_handler = None
2017-05-05 12:09:32 +01:00
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
2018-10-14 01:55:33 +01:00
self.request.stream.put(None)
)
2017-05-05 12:09:32 +01:00
return
self.request.body_finish()
2017-05-05 12:09:32 +01:00
self.execute_request_handler()
2017-02-21 16:05:06 +00:00
2017-05-05 12:09:32 +01:00
def execute_request_handler(self):
"""
Invoke the request handler defined by the
:func:`sanic.app.Sanic.handle_request` method
:return: None
"""
self._response_timeout_handler = self.loop.call_later(
2018-10-14 01:55:33 +01:00
self.response_timeout, self.response_timeout_callback
)
self._last_request_time = current_time
self._request_handler_task = self.loop.create_task(
2017-02-21 16:28:45 +00:00
self.request_handler(
2018-10-14 01:55:33 +01:00
self.request, self.write_response, self.stream_response
)
)
2016-10-15 20:59:00 +01:00
# -------------------------------------------- #
# Responding
# -------------------------------------------- #
2017-10-04 11:50:57 +01:00
def log_response(self, response):
"""
Helper method provided to enable the logging of responses in case if
the :attr:`HttpProtocol.access_log` is enabled.
:param response: Response generated for the current request
:type response: :class:`sanic.response.HTTPResponse` or
:class:`sanic.response.StreamingHTTPResponse`
:return: None
"""
2017-10-05 08:28:13 +01:00
if self.access_log:
2018-10-14 01:55:33 +01:00
extra = {"status": getattr(response, "status", 0)}
2017-10-04 11:50:57 +01:00
if isinstance(response, HTTPResponse):
2018-10-14 01:55:33 +01:00
extra["byte"] = len(response.body)
2017-10-04 11:50:57 +01:00
else:
2018-10-14 01:55:33 +01:00
extra["byte"] = -1
2017-10-04 11:50:57 +01:00
2018-10-14 01:55:33 +01:00
extra["host"] = "UNKNOWN"
if self.request is not None:
2017-11-28 06:44:32 +00:00
if self.request.ip:
2018-10-14 01:55:33 +01:00
extra["host"] = "{0}:{1}".format(
self.request.ip, self.request.port
)
2017-11-28 06:44:32 +00:00
2018-10-14 01:55:33 +01:00
extra["request"] = "{0} {1}".format(
self.request.method, self.request.url
)
2017-10-04 11:50:57 +01:00
else:
2018-10-14 01:55:33 +01:00
extra["request"] = "nil"
2017-10-04 11:50:57 +01:00
2018-10-14 01:55:33 +01:00
access_logger.info("", extra=extra)
2017-10-04 11:50:57 +01:00
2016-10-15 20:59:00 +01:00
def write_response(self, response):
2017-02-21 16:28:45 +00:00
"""
Writes response content synchronously to the transport.
"""
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
2016-10-15 20:59:00 +01:00
try:
2017-04-12 09:55:22 +01:00
keep_alive = self.keep_alive
2016-10-16 14:01:59 +01:00
self.transport.write(
response.output(
2018-10-14 01:55:33 +01:00
self.request.version, keep_alive, self.keep_alive_timeout
)
)
2017-10-04 11:50:57 +01:00
self.log_response(response)
2017-02-21 16:28:45 +00:00
except AttributeError:
2018-10-14 01:55:33 +01:00
logger.error(
"Invalid response object for url %s, "
"Expected Type: HTTPResponse, Actual Type: %s",
self.url,
type(response),
)
self.write_error(ServerError("Invalid response type"))
2017-02-21 16:28:45 +00:00
except RuntimeError:
if self._debug:
2018-10-14 01:55:33 +01:00
logger.error(
"Connection lost before response written @ %s",
self.request.ip,
)
keep_alive = False
2017-02-21 16:28:45 +00:00
except Exception as e:
self.bail_out(
2018-10-14 01:55:33 +01:00
"Writing response failed, connection closed {}".format(repr(e))
)
2017-02-21 16:28:45 +00:00
finally:
if not keep_alive:
self.transport.close()
self.transport = None
2017-02-21 16:05:06 +00:00
else:
self._keep_alive_timeout_handler = self.loop.call_later(
2018-10-14 01:55:33 +01:00
self.keep_alive_timeout, self.keep_alive_timeout_callback
)
self._last_response_time = current_time
2017-02-21 16:28:45 +00:00
self.cleanup()
async def drain(self):
await self._not_paused.wait()
def push_data(self, data):
self.transport.write(data)
2017-02-21 16:28:45 +00:00
async def stream_response(self, response):
"""
Streams a response to the client asynchronously. Attaches
the transport to the response so the response consumer can
write to the response as needed.
"""
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
2017-02-21 16:28:45 +00:00
try:
2017-04-12 09:55:22 +01:00
keep_alive = self.keep_alive
response.protocol = self
2017-02-21 16:28:45 +00:00
await response.stream(
2018-10-14 01:55:33 +01:00
self.request.version, keep_alive, self.keep_alive_timeout
)
2017-10-04 11:50:57 +01:00
self.log_response(response)
except AttributeError:
2018-10-14 01:55:33 +01:00
logger.error(
"Invalid response object for url %s, "
"Expected Type: HTTPResponse, Actual Type: %s",
self.url,
type(response),
)
self.write_error(ServerError("Invalid response type"))
except RuntimeError:
if self._debug:
2018-10-14 01:55:33 +01:00
logger.error(
"Connection lost before response written @ %s",
self.request.ip,
)
keep_alive = False
2016-10-15 20:59:00 +01:00
except Exception as e:
2016-10-16 14:01:59 +01:00
self.bail_out(
2018-10-14 01:55:33 +01:00
"Writing response failed, connection closed {}".format(repr(e))
)
finally:
2016-10-15 20:59:00 +01:00
if not keep_alive:
self.transport.close()
self.transport = None
2016-10-15 20:59:00 +01:00
else:
self._keep_alive_timeout_handler = self.loop.call_later(
2018-10-14 01:55:33 +01:00
self.keep_alive_timeout, self.keep_alive_timeout_callback
)
self._last_response_time = current_time
2016-10-15 20:59:00 +01:00
self.cleanup()
def write_error(self, exception):
# An error _is_ a response.
# Don't throw a response timeout, when a response _is_ given.
if self._response_timeout_handler:
self._response_timeout_handler.cancel()
self._response_timeout_handler = None
2017-07-23 18:37:36 +01:00
response = None
try:
response = self.error_handler.response(self.request, exception)
2018-10-14 01:55:33 +01:00
version = self.request.version if self.request else "1.1"
self.transport.write(response.output(version))
except RuntimeError:
if self._debug:
2018-10-14 01:55:33 +01:00
logger.error(
"Connection lost before error written @ %s",
self.request.ip if self.request else "Unknown",
)
except Exception as e:
self.bail_out(
2018-10-14 01:55:33 +01:00
"Writing error failed, connection closed {}".format(repr(e)),
from_error=True,
)
finally:
2018-10-14 01:55:33 +01:00
if self.parser and (
self.keep_alive or getattr(response, "status", 0) == 408
):
2017-10-04 11:50:57 +01:00
self.log_response(response)
try:
self.transport.close()
2018-10-03 01:59:24 +01:00
except AttributeError:
2018-10-14 01:55:33 +01:00
logger.debug("Connection lost before server could close it.")
2016-10-15 20:59:00 +01:00
def bail_out(self, message, from_error=False):
"""
In case if the transport pipes are closed and the sanic app encounters
an error while writing data to the transport pipe, we log the error
with proper details.
:param message: Error message to display
:param from_error: If the bail out was invoked while handling an
exception scenario.
:type message: str
:type from_error: bool
:return: None
"""
2017-03-09 04:36:01 +00:00
if from_error or self.transport.is_closing():
2018-10-14 01:55:33 +01:00
logger.error(
"Transport closed @ %s and exception "
"experienced during error handling",
self.transport.get_extra_info("peername"),
)
logger.debug("Exception:", exc_info=True)
else:
2018-10-03 01:59:24 +01:00
self.write_error(ServerError(message))
2017-09-12 06:12:49 +01:00
logger.error(message)
2016-10-15 20:59:00 +01:00
def cleanup(self):
"""This is called when KeepAlive feature is used,
it resets the connection in order for it to be able
to handle receiving another request on the same connection."""
2016-10-15 20:59:00 +01:00
self.parser = None
self.request = None
self.url = None
self.headers = None
self._request_handler_task = None
2017-05-05 12:09:32 +01:00
self._request_stream_task = None
2016-10-15 20:59:00 +01:00
self._total_request_size = 0
2017-05-05 12:09:32 +01:00
self._is_stream_handler = False
2016-10-15 20:59:00 +01:00
def close_if_idle(self):
"""Close the connection if a request is not being sent or received
2016-10-15 20:59:00 +01:00
:return: boolean - True if closed, false if staying open
"""
if not self.parser:
self.transport.close()
return True
return False
def close(self):
"""
Force close the connection.
"""
if self.transport is not None:
self.transport.close()
self.transport = None
2016-10-15 20:59:00 +01:00
def update_current_time(loop):
"""Cache the current time, since it is needed at the end of every
keep-alive request to update the request timeout time
:param loop:
:return:
"""
global current_time
2016-10-28 11:13:03 +01:00
current_time = time()
loop.call_later(1, partial(update_current_time, loop))
2016-10-15 20:59:00 +01:00
def trigger_events(events, loop):
"""Trigger event callbacks (functions or async)
:param events: one or more sync or async functions to execute
:param loop: event loop
"""
for event in events:
result = event(loop)
if isawaitable(result):
loop.run_until_complete(result)
2018-10-14 01:55:33 +01:00
def serve(
host,
port,
request_handler,
error_handler,
before_start=None,
after_start=None,
before_stop=None,
after_stop=None,
debug=False,
request_timeout=60,
response_timeout=60,
keep_alive_timeout=5,
ssl=None,
sock=None,
request_max_size=None,
request_buffer_queue_size=100,
2018-10-14 01:55:33 +01:00
reuse_port=False,
loop=None,
protocol=HttpProtocol,
backlog=100,
register_sys_signals=True,
run_multiple=False,
run_async=False,
connections=None,
signal=Signal(),
request_class=None,
access_log=True,
keep_alive=True,
is_request_stream=False,
router=None,
websocket_max_size=None,
websocket_max_queue=None,
websocket_read_limit=2 ** 16,
websocket_write_limit=2 ** 16,
state=None,
graceful_shutdown_timeout=15.0,
):
"""Start asynchronous HTTP Server on an individual process.
:param host: Address to host on
:param port: Port to host on
:param request_handler: Sanic request handler with middleware
2016-12-24 02:40:07 +00:00
:param error_handler: Sanic error handler with middleware
:param before_start: function to be executed before the server starts
2017-01-26 00:25:16 +00:00
listening. Takes arguments `app` instance and `loop`
:param after_start: function to be executed after the server starts
2017-01-26 00:25:16 +00:00
listening. Takes arguments `app` instance and `loop`
:param before_stop: function to be executed when a stop signal is
2017-01-26 00:25:16 +00:00
received before it is respected. Takes arguments
`app` instance and `loop`
:param after_stop: function to be executed when a stop signal is
2017-01-26 00:25:16 +00:00
received after it is respected. Takes arguments
2017-03-16 05:52:18 +00:00
`app` instance and `loop`
:param debug: enables debug output (slows server)
:param request_timeout: time in seconds
:param response_timeout: time in seconds
:param keep_alive_timeout: time in seconds
:param ssl: SSLContext
:param sock: Socket for the server to accept connections from
:param request_max_size: size in bytes, `None` for no limit
:param reuse_port: `True` for multiple workers
:param loop: asyncio compatible event loop
:param protocol: subclass of asyncio protocol class
:param request_class: Request class to use
2017-09-11 02:38:52 +01:00
:param access_log: disable/enable access log
2018-02-10 05:03:21 +00:00
:param websocket_max_size: enforces the maximum size for
incoming messages in bytes.
:param websocket_max_queue: sets the maximum length of the queue
that holds incoming messages.
:param websocket_read_limit: sets the high-water limit of the buffer for
incoming bytes, the low-water limit is half
the high-water limit.
:param websocket_write_limit: sets the high-water limit of the buffer for
outgoing bytes, the low-water limit is a
quarter of the high-water limit.
2017-05-05 12:09:32 +01:00
:param is_request_stream: disable/enable Request.stream
:param request_buffer_queue_size: streaming request buffer queue size
2017-05-05 12:09:32 +01:00
:param router: Router object
:param graceful_shutdown_timeout: How long take to Force close non-idle
connection
:return: Nothing
"""
2017-02-04 07:27:46 +00:00
if not run_async:
2018-01-15 19:23:49 +00:00
# create new event_loop after fork
loop = asyncio.new_event_loop()
2017-02-04 07:27:46 +00:00
asyncio.set_event_loop(loop)
if debug:
loop.set_debug(debug)
2016-10-15 20:59:00 +01:00
2017-03-15 09:43:47 +00:00
connections = connections if connections is not None else set()
server = partial(
2016-12-22 15:13:38 +00:00
protocol,
2016-10-15 20:59:00 +01:00
loop=loop,
connections=connections,
signal=signal,
request_handler=request_handler,
error_handler=error_handler,
2016-10-15 20:59:00 +01:00
request_timeout=request_timeout,
response_timeout=response_timeout,
keep_alive_timeout=keep_alive_timeout,
2016-10-15 20:59:00 +01:00
request_max_size=request_max_size,
request_class=request_class,
2017-09-11 02:38:52 +01:00
access_log=access_log,
2017-04-17 06:43:49 +01:00
keep_alive=keep_alive,
2017-05-05 12:09:32 +01:00
is_request_stream=is_request_stream,
router=router,
websocket_max_size=websocket_max_size,
websocket_max_queue=websocket_max_queue,
2018-02-10 04:44:02 +00:00
websocket_read_limit=websocket_read_limit,
websocket_write_limit=websocket_write_limit,
2017-06-26 13:05:23 +01:00
state=state,
debug=debug,
)
server_coroutine = loop.create_server(
server,
host,
port,
ssl=ssl,
reuse_port=reuse_port,
2017-01-04 02:35:11 +00:00
sock=sock,
2018-10-14 01:55:33 +01:00
backlog=backlog,
)
# Instead of pulling time at the end of every request,
# pull it once per minute
loop.call_soon(partial(update_current_time, loop))
2017-01-24 03:58:37 +00:00
if run_async:
return server_coroutine
trigger_events(before_start, loop)
2016-10-15 20:59:00 +01:00
try:
http_server = loop.run_until_complete(server_coroutine)
2017-10-26 05:58:31 +01:00
except BaseException:
2017-09-12 06:12:49 +01:00
logger.exception("Unable to start server")
2016-10-15 20:59:00 +01:00
return
trigger_events(after_start, loop)
2016-10-15 20:59:00 +01:00
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
2016-10-15 20:59:00 +01:00
# Register signals for graceful termination
if register_sys_signals:
2018-01-15 19:23:49 +00:00
_singals = (SIGTERM,) if run_multiple else (SIGINT, SIGTERM)
for _signal in _singals:
2017-02-14 02:07:35 +00:00
try:
loop.add_signal_handler(_signal, loop.stop)
except NotImplementedError:
2018-10-14 01:55:33 +01:00
logger.warning(
"Sanic tried to use loop.add_signal_handler "
"but it is not implemented on this platform."
)
pid = os.getpid()
2016-10-15 20:59:00 +01:00
try:
2018-10-14 01:55:33 +01:00
logger.info("Starting worker [%s]", pid)
2016-10-15 20:59:00 +01:00
loop.run_forever()
finally:
2017-10-06 14:53:30 +01:00
logger.info("Stopping worker [%s]", pid)
2016-10-15 20:59:00 +01:00
# Run the on_stop function if provided
trigger_events(before_stop, loop)
2016-10-15 20:59:00 +01:00
# Wait for event loop to finish and all connections to drain
http_server.close()
loop.run_until_complete(http_server.wait_closed())
# Complete all tasks on the loop
signal.stopped = True
for connection in connections:
2016-10-15 20:59:00 +01:00
connection.close_if_idle()
2017-06-25 03:00:33 +01:00
# Gracefully shutdown timeout.
# We should provide graceful_shutdown_timeout,
# instead of letting connection hangs forever.
# Let's roughly calcucate time.
start_shutdown = 0
while connections and (start_shutdown < graceful_shutdown_timeout):
2016-10-15 20:59:00 +01:00
loop.run_until_complete(asyncio.sleep(0.1))
2017-06-25 03:00:33 +01:00
start_shutdown = start_shutdown + 0.1
# Force close non-idle connection after waiting for
# graceful_shutdown_timeout
coros = []
for conn in connections:
if hasattr(conn, "websocket") and conn.websocket:
2018-10-14 01:55:33 +01:00
coros.append(conn.websocket.close_connection())
2017-06-25 03:00:33 +01:00
else:
conn.close()
_shutdown = asyncio.gather(*coros, loop=loop)
loop.run_until_complete(_shutdown)
2016-10-15 20:59:00 +01:00
trigger_events(after_stop, loop)
2016-10-15 20:59:00 +01:00
loop.close()
2017-02-27 00:31:39 +00:00
def serve_multiple(server_settings, workers):
"""Start multiple server processes simultaneously. Stop on interrupt
and terminate signals, and drain connections when complete.
:param server_settings: kw arguments to be passed to the serve function
:param workers: number of workers to launch
:param stop_event: if provided, is used as a stop signal
:return:
"""
2018-10-14 01:55:33 +01:00
server_settings["reuse_port"] = True
server_settings["run_multiple"] = True
# Handling when custom socket is not provided.
2018-10-14 01:55:33 +01:00
if server_settings.get("sock") is None:
sock = socket()
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
2018-10-14 01:55:33 +01:00
sock.bind((server_settings["host"], server_settings["port"]))
sock.set_inheritable(True)
2018-10-14 01:55:33 +01:00
server_settings["sock"] = sock
server_settings["host"] = None
server_settings["port"] = None
2017-02-27 00:31:39 +00:00
def sig_handler(signal, frame):
2017-10-06 14:53:30 +01:00
logger.info("Received signal %s. Shutting down.", Signals(signal).name)
for process in processes:
2018-01-15 19:23:49 +00:00
os.kill(process.pid, SIGTERM)
2017-02-27 00:31:39 +00:00
signal_func(SIGINT, lambda s, f: sig_handler(s, f))
signal_func(SIGTERM, lambda s, f: sig_handler(s, f))
processes = []
for _ in range(workers):
process = Process(target=serve, kwargs=server_settings)
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
# the above processes will block this until they're stopped
for process in processes:
process.terminate()
2018-10-14 01:55:33 +01:00
server_settings.get("sock").close()