Merge pull request #815 from yunstanford/master

add graceful timeout when shutdown
This commit is contained in:
Raphael Deem 2017-07-08 11:31:37 -07:00 committed by GitHub
commit f0a956467c
5 changed files with 82 additions and 4 deletions

View File

@ -701,7 +701,8 @@ class Sanic:
'backlog': backlog, 'backlog': backlog,
'has_log': has_log, 'has_log': has_log,
'websocket_max_size': self.config.WEBSOCKET_MAX_SIZE, 'websocket_max_size': self.config.WEBSOCKET_MAX_SIZE,
'websocket_max_queue': self.config.WEBSOCKET_MAX_QUEUE 'websocket_max_queue': self.config.WEBSOCKET_MAX_QUEUE,
'graceful_shutdown_timeout': self.config.GRACEFUL_SHUTDOWN_TIMEOUT
} }
# -------------------------------------------- # # -------------------------------------------- #

View File

@ -127,6 +127,7 @@ class Config(dict):
self.KEEP_ALIVE = keep_alive self.KEEP_ALIVE = keep_alive
self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes self.WEBSOCKET_MAX_SIZE = 2 ** 20 # 1 megabytes
self.WEBSOCKET_MAX_QUEUE = 32 self.WEBSOCKET_MAX_QUEUE = 32
self.GRACEFUL_SHUTDOWN_TIMEOUT = 15.0 # 15 sec
if load_env: if load_env:
self.load_environment_vars() self.load_environment_vars()

View File

@ -380,6 +380,14 @@ class HttpProtocol(asyncio.Protocol):
return True return True
return False return False
def close(self):
"""
Force close the connection.
"""
if self.transport is not None:
self.transport.close()
self.transport = None
def update_current_time(loop): def update_current_time(loop):
"""Cache the current time, since it is needed at the end of every """Cache the current time, since it is needed at the end of every
@ -412,7 +420,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
register_sys_signals=True, run_async=False, connections=None, register_sys_signals=True, run_async=False, connections=None,
signal=Signal(), request_class=None, has_log=True, keep_alive=True, signal=Signal(), request_class=None, has_log=True, keep_alive=True,
is_request_stream=False, router=None, websocket_max_size=None, is_request_stream=False, router=None, websocket_max_size=None,
websocket_max_queue=None, state=None): websocket_max_queue=None, state=None,
graceful_shutdown_timeout=15.0):
"""Start asynchronous HTTP Server on an individual process. """Start asynchronous HTTP Server on an individual process.
:param host: Address to host on :param host: Address to host on
@ -525,8 +534,26 @@ def serve(host, port, request_handler, error_handler, before_start=None,
for connection in connections: for connection in connections:
connection.close_if_idle() connection.close_if_idle()
while connections: # Gracefully shutdown timeout.
# We should provide graceful_shutdown_timeout,
# instead of letting connection hangs forever.
# Let's roughly calcucate time.
start_shutdown = 0
while connections and (start_shutdown < graceful_shutdown_timeout):
loop.run_until_complete(asyncio.sleep(0.1)) loop.run_until_complete(asyncio.sleep(0.1))
start_shutdown = start_shutdown + 0.1
# Force close non-idle connection after waiting for
# graceful_shutdown_timeout
coros = []
for conn in connections:
if hasattr(conn, "websocket") and conn.websocket:
coros.append(conn.websocket.close_connection(force=True))
else:
conn.close()
_shutdown = asyncio.gather(*coros, loop=loop)
loop.run_until_complete(_shutdown)
trigger_events(after_stop, loop) trigger_events(after_stop, loop)

View File

@ -91,8 +91,24 @@ class GunicornWorker(base.Worker):
for conn in self.connections: for conn in self.connections:
conn.close_if_idle() conn.close_if_idle()
while self.connections: # gracefully shutdown timeout
start_shutdown = 0
graceful_shutdown_timeout = self.cfg.graceful_timeout
while self.connections and \
(start_shutdown < graceful_shutdown_timeout):
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
start_shutdown = start_shutdown + 0.1
# Force close non-idle connection after waiting for
# graceful_shutdown_timeout
coros = []
for conn in self.connections:
if hasattr(conn, "websocket") and conn.websocket:
coros.append(conn.websocket.close_connection(force=True))
else:
conn.close()
_shutdown = asyncio.gather(*coros, loop=self.loop)
await _shutdown
async def _run(self): async def _run(self):
for sock in self.sockets: for sock in self.sockets:

View File

@ -100,3 +100,36 @@ def test_run_max_requests_exceeded(worker):
worker.notify.assert_called_with() worker.notify.assert_called_with()
worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s", worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
worker) worker)
def test_worker_close(worker):
loop = asyncio.new_event_loop()
asyncio.sleep = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
worker.ppid = 1
worker.pid = 2
worker.cfg.graceful_timeout = 1.0
worker.signal = mock.Mock()
worker.signal.stopped = False
worker.wsgi = mock.Mock()
conn = mock.Mock()
conn.websocket = mock.Mock()
conn.websocket.close_connection = mock.Mock(
wraps=asyncio.coroutine(lambda *a, **kw: None)
)
worker.connections = set([conn])
worker.log = mock.Mock()
worker.loop = loop
server = mock.Mock()
server.close = mock.Mock(wraps=lambda *a, **kw: None)
server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
worker.servers = {
server: {"requests_count": 14},
}
worker.max_requests = 10
# close worker
_close = asyncio.ensure_future(worker.close(), loop=loop)
loop.run_until_complete(_close)
assert worker.signal.stopped == True
conn.websocket.close_connection.assert_called_with(force=True)
assert len(worker.servers) == 0