diff --git a/sanic/app.py b/sanic/app.py index ff680d9c..5b071200 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -701,7 +701,8 @@ class Sanic: 'backlog': backlog, 'has_log': has_log, 'websocket_max_size': self.config.WEBSOCKET_MAX_SIZE, - 'websocket_max_queue': self.config.WEBSOCKET_MAX_QUEUE + 'websocket_max_queue': self.config.WEBSOCKET_MAX_QUEUE, + 'graceful_shutdown_timeout': self.config.GRACEFUL_SHUTDOWN_TIMEOUT } # -------------------------------------------- # diff --git a/sanic/config.py b/sanic/config.py index f5649cfe..4af31532 100644 --- a/sanic/config.py +++ b/sanic/config.py @@ -127,6 +127,7 @@ class Config(dict): self.KEEP_ALIVE = keep_alive self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes self.WEBSOCKET_MAX_QUEUE = 32 + self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec if load_env: self.load_environment_vars() diff --git a/sanic/server.py b/sanic/server.py index 81b56bad..2ee48688 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -380,6 +380,14 @@ class HttpProtocol(asyncio.Protocol): return True return False + def close(self): + """ + Force close the connection. + """ + if self.transport is not None: + self.transport.close() + self.transport = None + def update_current_time(loop): """Cache the current time, since it is needed at the end of every @@ -412,7 +420,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, register_sys_signals=True, run_async=False, connections=None, signal=Signal(), request_class=None, has_log=True, keep_alive=True, is_request_stream=False, router=None, websocket_max_size=None, - websocket_max_queue=None, state=None): + websocket_max_queue=None, state=None, + graceful_shutdown_timeout=15.0): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -525,8 +534,26 @@ def serve(host, port, request_handler, error_handler, before_start=None, for connection in connections: connection.close_if_idle() - while connections: + # Gracefully shutdown timeout. + # We should provide graceful_shutdown_timeout, + # instead of letting connection hangs forever. + # Let's roughly calcucate time. + start_shutdown = 0 + while connections and (start_shutdown < graceful_shutdown_timeout): loop.run_until_complete(asyncio.sleep(0.1)) + start_shutdown = start_shutdown + 0.1 + + # Force close non-idle connection after waiting for + # graceful_shutdown_timeout + coros = [] + for conn in connections: + if hasattr(conn, "websocket") and conn.websocket: + coros.append(conn.websocket.close_connection(force=True)) + else: + conn.close() + + _shutdown = asyncio.gather(*coros, loop=loop) + loop.run_until_complete(_shutdown) trigger_events(after_stop, loop) diff --git a/sanic/worker.py b/sanic/worker.py index 876354ce..7c02053c 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -91,8 +91,24 @@ class GunicornWorker(base.Worker): for conn in self.connections: conn.close_if_idle() - while self.connections: + # gracefully shutdown timeout + start_shutdown = 0 + graceful_shutdown_timeout = self.cfg.graceful_timeout + while self.connections and \ + (start_shutdown < graceful_shutdown_timeout): await asyncio.sleep(0.1) + start_shutdown = start_shutdown + 0.1 + + # Force close non-idle connection after waiting for + # graceful_shutdown_timeout + coros = [] + for conn in self.connections: + if hasattr(conn, "websocket") and conn.websocket: + coros.append(conn.websocket.close_connection(force=True)) + else: + conn.close() + _shutdown = asyncio.gather(*coros, loop=self.loop) + await _shutdown async def _run(self): for sock in self.sockets: diff --git a/tests/test_worker.py b/tests/test_worker.py index e1a13368..e2b301ec 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -100,3 +100,36 @@ def test_run_max_requests_exceeded(worker): worker.notify.assert_called_with() worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s", worker) + +def test_worker_close(worker): + loop = asyncio.new_event_loop() + asyncio.sleep = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None)) + worker.ppid = 1 + worker.pid = 2 + worker.cfg.graceful_timeout = 1.0 + worker.signal = mock.Mock() + worker.signal.stopped = False + worker.wsgi = mock.Mock() + conn = mock.Mock() + conn.websocket = mock.Mock() + conn.websocket.close_connection = mock.Mock( + wraps=asyncio.coroutine(lambda *a, **kw: None) + ) + worker.connections = set([conn]) + worker.log = mock.Mock() + worker.loop = loop + server = mock.Mock() + server.close = mock.Mock(wraps=lambda *a, **kw: None) + server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None)) + worker.servers = { + server: {"requests_count": 14}, + } + worker.max_requests = 10 + + # close worker + _close = asyncio.ensure_future(worker.close(), loop=loop) + loop.run_until_complete(_close) + + assert worker.signal.stopped == True + conn.websocket.close_connection.assert_called_with(force=True) + assert len(worker.servers) == 0