From 2b296435b3b77b07af9ae20b9469c01eacc5710a Mon Sep 17 00:00:00 2001 From: messense Date: Sun, 12 Mar 2017 22:42:34 +0800 Subject: [PATCH] Trigger events --- examples/simple_server.py | 3 +- examples/try_everything.py | 13 ++++++- sanic/server.py | 4 +- sanic/worker.py | 78 +++++++++++++++++++++++++++++++------- 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/examples/simple_server.py b/examples/simple_server.py index 24e3570f..a803feb8 100644 --- a/examples/simple_server.py +++ b/examples/simple_server.py @@ -9,4 +9,5 @@ async def test(request): return json({"test": True}) -app.run(host="0.0.0.0", port=8000) +if __name__ == '__main__': + app.run(host="0.0.0.0", port=8000) diff --git a/examples/try_everything.py b/examples/try_everything.py index f7191ecc..da3cc515 100644 --- a/examples/try_everything.py +++ b/examples/try_everything.py @@ -70,6 +70,11 @@ def query_string(request): # Run Server # ----------------------------------------------- # +@app.listener('before_server_start') +def before_start(app, loop): + log.info("SERVER STARTING") + + @app.listener('after_server_start') def after_start(app, loop): log.info("OH OH OH OH OHHHHHHHH") @@ -77,7 +82,13 @@ def after_start(app, loop): @app.listener('before_server_stop') def before_stop(app, loop): + log.info("SERVER STOPPING") + + +@app.listener('after_server_stop') +def after_stop(app, loop): log.info("TRIED EVERYTHING") -app.run(host="0.0.0.0", port=8000, debug=True) +if __name__ == '__main__': + app.run(host="0.0.0.0", port=8000, debug=True) diff --git a/sanic/server.py b/sanic/server.py index 11601c00..ee45e81b 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -313,7 +313,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, after_start=None, before_stop=None, after_stop=None, debug=False, request_timeout=60, ssl=None, sock=None, request_max_size=None, reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, - register_sys_signals=True, run_async=False): + register_sys_signals=True, run_async=False, connections=None): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -349,7 +349,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, trigger_events(before_start, loop) - connections = set() + connections = connections or set() signal = Signal() server = partial( protocol, diff --git a/sanic/worker.py b/sanic/worker.py index e4906faf..cd1a6b9f 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -1,16 +1,29 @@ -import asyncio import os +import asyncio +import logging +try: + import ssl +except ImportError: + ssl = None import uvloop import gunicorn.workers.base as base +from sanic.server import trigger_events, serve, HttpProtocol +from sanic.websocket import WebSocketProtocol + class GunicornWorker(base.Worker): def __init__(self, *args, **kw): # pragma: no cover super().__init__(*args, **kw) + cfg = self.cfg + if cfg.is_ssl: + self.ssl_context = self._create_ssl_context(cfg) + else: + self.ssl_context = None self.servers = [] - self.connections = {} + self.connections = set() def init_process(self): # create new event_loop after fork @@ -28,20 +41,49 @@ class GunicornWorker(base.Worker): try: self.loop.run_until_complete(self._runner) finally: + trigger_events(self._server_settings.get('before_stop', []), self.loop) self.loop.close() + trigger_events(self._server_settings.get('after_stop', []), self.loop) async def close(self): - try: - if hasattr(self.wsgi, 'close'): - await self.wsgi.close() - except: - self.log.exception('Process shutdown exception') + if self.servers: + # stop accepting connections + self.log.info("Stopping server: %s, connections: %s", + self.pid, len(self.connections)) + for server in self.servers: + server.close() + await server.wait_closed() + self.servers.clear() + + # prepare connections for closing + for conn in self.connections: + conn.close_if_idle() + + while self.connections: + await asyncio.sleep(0.1) async def _run(self): + is_debug = self.log.loglevel == logging.DEBUG + protocol = (WebSocketProtocol if self.app.callable.websocket_enabled + else HttpProtocol) + self._server_settings = self.app.callable._helper( + host=None, + port=None, + loop=self.loop, + debug=is_debug, + protocol=protocol, + ssl=self.ssl_context, + run_async=True + ) + self._server_settings.pop('sock') for sock in self.sockets: - self.servers.append(await self.app.callable.create_server( - sock=sock, host=None, port=None, loop=self.loop)) + self.servers.append(await serve( + sock=sock, + connections=self.connections, + **self._server_settings + )) + trigger_events(self._server_settings.get('after_start', []), self.loop) # If our parent changed then we shut down. pid = os.getpid() try: @@ -56,8 +98,18 @@ class GunicornWorker(base.Worker): except (Exception, BaseException, GeneratorExit, KeyboardInterrupt): pass - if self.servers: - for server in self.servers: - server.close() - await self.close() + + @staticmethod + def _create_ssl_context(cfg): + """ Creates SSLContext instance for usage in asyncio.create_server. + See ssl.SSLSocket.__init__ for more details. + """ + ctx = ssl.SSLContext(cfg.ssl_version) + ctx.load_cert_chain(cfg.certfile, cfg.keyfile) + ctx.verify_mode = cfg.cert_reqs + if cfg.ca_certs: + ctx.load_verify_locations(cfg.ca_certs) + if cfg.ciphers: + ctx.set_ciphers(cfg.ciphers) + return ctx