From 5c19eb34bf20f6b8667614c761f3ef43a1258df5 Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Sat, 24 Jun 2017 19:00:33 -0700 Subject: [PATCH 1/5] add graceful_shutdown_timeout --- sanic/server.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/sanic/server.py b/sanic/server.py index 369e790e..11c2452c 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -395,7 +395,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, register_sys_signals=True, run_async=False, connections=None, signal=Signal(), request_class=None, has_log=True, keep_alive=True, is_request_stream=False, router=None, websocket_max_size=None, - websocket_max_queue=None, state=None): + websocket_max_queue=None, state=None, graceful_shutdown_timeout=15.0): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -507,8 +507,26 @@ def serve(host, port, request_handler, error_handler, before_start=None, for connection in connections: connection.close_if_idle() - while connections: + # Gracefully shutdown timeout. + # We should provide graceful_shutdown_timeout, + # instead of letting connection hangs forever. + # Let's roughly calcucate time. + start_shutdown = 0 + while connections and (start_shutdown < graceful_shutdown_timeout): loop.run_until_complete(asyncio.sleep(0.1)) + start_shutdown = start_shutdown + 0.1 + + # Force close non-idle connection after waiting for + # graceful_shutdown_timeout + coros = [] + for conn in connections: + if hasattr(conn, "websocket") and conn.websocket: + coros.append(conn.websocket.close_connection(force=True)) + else: + conn.close() + + _shutdown = asyncio.gather(*coros, loop=loop) + loop.run_until_complete(_shutdown) trigger_events(after_stop, loop) From d812affef022b86b9c58a8110d60f5b13c3faa50 Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Sun, 25 Jun 2017 00:51:14 -0700 Subject: [PATCH 2/5] add graceful_shutdown_timeout to gunicorn worker --- sanic/app.py | 3 ++- sanic/config.py | 1 + sanic/server.py | 8 ++++++++ sanic/worker.py | 18 +++++++++++++++++- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sanic/app.py b/sanic/app.py index ff680d9c..5b071200 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -701,7 +701,8 @@ class Sanic: 'backlog': backlog, 'has_log': has_log, 'websocket_max_size': self.config.WEBSOCKET_MAX_SIZE, - 'websocket_max_queue': self.config.WEBSOCKET_MAX_QUEUE + 'websocket_max_queue': self.config.WEBSOCKET_MAX_QUEUE, + 'graceful_shutdown_timeout': self.config.GRACEFUL_SHUTDOWN_TIMEOUT } # -------------------------------------------- # diff --git a/sanic/config.py b/sanic/config.py index e3563bc1..075bc47d 100644 --- a/sanic/config.py +++ b/sanic/config.py @@ -127,6 +127,7 @@ class Config(dict): self.KEEP_ALIVE = keep_alive self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes self.WEBSOCKET_MAX_QUEUE = 32 + self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec if load_env: self.load_environment_vars() diff --git a/sanic/server.py b/sanic/server.py index 11c2452c..7ceef655 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -363,6 +363,14 @@ class HttpProtocol(asyncio.Protocol): return True return False + def close(self): + """ + Force close the connection. + """ + if self.transport is not None: + self.transport.close() + self.transport = None + def update_current_time(loop): """Cache the current time, since it is needed at the end of every diff --git a/sanic/worker.py b/sanic/worker.py index 876354ce..3adef65f 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -91,8 +91,24 @@ class GunicornWorker(base.Worker): for conn in self.connections: conn.close_if_idle() - while self.connections: + # gracefully shutdown timeout + start_shutdown = 0 + graceful_shutdown_timeout = self.cfg.graceful_timeout + while self.connections and (start_shutdown < graceful_shutdown_timeout): await asyncio.sleep(0.1) + start_shutdown = start_shutdown + 0.1 + + # Force close non-idle connection after waiting for + # graceful_shutdown_timeout + coros = [] + for conn in self.connections: + if hasattr(conn, "websocket") and conn.websocket: + coros.append(conn.websocket.close_connection(force=True)) + else: + conn.close() + _shutdown = asyncio.gather(*coros, loop=self.loop) + await _shutdown + async def _run(self): for sock in self.sockets: From 7720e31a31834bbcabecaf18ec583543dc4a1cee Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Sun, 25 Jun 2017 00:51:59 -0700 Subject: [PATCH 3/5] add unit test --- tests/test_worker.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index e1a13368..0e93c603 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -100,3 +100,35 @@ def test_run_max_requests_exceeded(worker): worker.notify.assert_called_with() worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s", worker) + +def test_worker_close(worker): + loop = asyncio.get_event_loop() + worker.ppid = 1 + worker.pid = 2 + worker.cfg.graceful_timeout = 1.0 + worker.signal = mock.Mock() + worker.signal.stopped = False + worker.wsgi = mock.Mock() + conn = mock.Mock() + conn.websocket = mock.Mock() + conn.websocket.close_connection = mock.Mock( + wraps=asyncio.coroutine(lambda *a, **kw: None) + ) + worker.connections = set([conn]) + worker.log = mock.Mock() + worker.loop = loop + server = mock.Mock() + server.close = mock.Mock(wraps=lambda *a, **kw: None) + server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None)) + worker.servers = { + server: {"requests_count": 14}, + } + worker.max_requests = 10 + + # close worker + _close = asyncio.ensure_future(worker.close(), loop=loop) + loop.run_until_complete(_close) + + assert worker.signal.stopped == True + conn.websocket.close_connection.assert_called_with(force=True) + assert len(worker.servers) == 0 From 221cf235b567071ca9001bdddd45ec2504439927 Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Sun, 25 Jun 2017 01:03:28 -0700 Subject: [PATCH 4/5] fix a unit test --- tests/test_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 0e93c603..e2b301ec 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -102,7 +102,8 @@ def test_run_max_requests_exceeded(worker): worker) def test_worker_close(worker): - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.sleep = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None)) worker.ppid = 1 worker.pid = 2 worker.cfg.graceful_timeout = 1.0 From b5d1f52ea45558ff8d935dffb41dfa654d1979e0 Mon Sep 17 00:00:00 2001 From: Yun Xu Date: Sun, 25 Jun 2017 10:22:40 -0700 Subject: [PATCH 5/5] make flake8 happy --- sanic/config.py | 2 +- sanic/server.py | 3 ++- sanic/worker.py | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sanic/config.py b/sanic/config.py index 075bc47d..2d24098c 100644 --- a/sanic/config.py +++ b/sanic/config.py @@ -127,7 +127,7 @@ class Config(dict): self.KEEP_ALIVE = keep_alive self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes self.WEBSOCKET_MAX_QUEUE = 32 - self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec + self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec if load_env: self.load_environment_vars() diff --git a/sanic/server.py b/sanic/server.py index 7ceef655..f4babb89 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -403,7 +403,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, register_sys_signals=True, run_async=False, connections=None, signal=Signal(), request_class=None, has_log=True, keep_alive=True, is_request_stream=False, router=None, websocket_max_size=None, - websocket_max_queue=None, state=None, graceful_shutdown_timeout=15.0): + websocket_max_queue=None, state=None, + graceful_shutdown_timeout=15.0): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on diff --git a/sanic/worker.py b/sanic/worker.py index 3adef65f..7c02053c 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -94,7 +94,8 @@ class GunicornWorker(base.Worker): # gracefully shutdown timeout start_shutdown = 0 graceful_shutdown_timeout = self.cfg.graceful_timeout - while self.connections and (start_shutdown < graceful_shutdown_timeout): + while self.connections and \ + (start_shutdown < graceful_shutdown_timeout): await asyncio.sleep(0.1) start_shutdown = start_shutdown + 0.1 @@ -109,7 +110,6 @@ class GunicornWorker(base.Worker): _shutdown = asyncio.gather(*coros, loop=self.loop) await _shutdown - async def _run(self): for sock in self.sockets: state = dict(requests_count=0)