From 19ee1dfecca09d23cc450df86c42e26ffd1548fb Mon Sep 17 00:00:00 2001 From: messense Date: Sun, 12 Mar 2017 16:19:34 +0800 Subject: [PATCH 01/11] Gunicorn worker --- sanic/app.py | 13 ++++++---- sanic/worker.py | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 sanic/worker.py diff --git a/sanic/app.py b/sanic/app.py index 2fd52fae..e101caf5 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -578,6 +578,10 @@ class Sanic: """This kills the Sanic""" get_event_loop().stop() + def __call__(self): + """gunicorn compatibility""" + return self + async def create_server(self, host="127.0.0.1", port=8000, debug=False, before_start=None, after_start=None, before_stop=None, after_stop=None, ssl=None, @@ -686,9 +690,10 @@ class Sanic: server_settings['run_async'] = True # Serve - proto = "http" - if ssl is not None: - proto = "https" - log.info('Goin\' Fast @ {}://{}:{}'.format(proto, host, port)) + if host and port: + proto = "http" + if ssl is not None: + proto = "https" + log.info('Goin\' Fast @ {}://{}:{}'.format(proto, host, port)) return server_settings diff --git a/sanic/worker.py b/sanic/worker.py new file mode 100644 index 00000000..e4906faf --- /dev/null +++ b/sanic/worker.py @@ -0,0 +1,63 @@ +import asyncio +import os + +import uvloop +import gunicorn.workers.base as base + + +class GunicornWorker(base.Worker): + + def __init__(self, *args, **kw): # pragma: no cover + super().__init__(*args, **kw) + self.servers = [] + self.connections = {} + + def init_process(self): + # create new event_loop after fork + asyncio.get_event_loop().close() + + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + super().init_process() + + def run(self): + self._runner = asyncio.async(self._run(), loop=self.loop) + + try: + self.loop.run_until_complete(self._runner) + finally: + self.loop.close() + + async def close(self): + try: + if hasattr(self.wsgi, 'close'): + await self.wsgi.close() + except: + self.log.exception('Process shutdown exception') + + async def _run(self): + for sock in self.sockets: + self.servers.append(await self.app.callable.create_server( + sock=sock, host=None, port=None, loop=self.loop)) + + # If our parent changed then we shut down. + pid = os.getpid() + try: + while self.alive: + self.notify() + + if pid == os.getpid() and self.ppid != os.getppid(): + self.alive = False + self.log.info("Parent changed, shutting down: %s", self) + else: + await asyncio.sleep(1.0, loop=self.loop) + except (Exception, BaseException, GeneratorExit, KeyboardInterrupt): + pass + + if self.servers: + for server in self.servers: + server.close() + + await self.close() From 2b296435b3b77b07af9ae20b9469c01eacc5710a Mon Sep 17 00:00:00 2001 From: messense Date: Sun, 12 Mar 2017 22:42:34 +0800 Subject: [PATCH 02/11] Trigger events --- examples/simple_server.py | 3 +- examples/try_everything.py | 13 ++++++- sanic/server.py | 4 +- sanic/worker.py | 78 +++++++++++++++++++++++++++++++------- 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/examples/simple_server.py b/examples/simple_server.py index 24e3570f..a803feb8 100644 --- a/examples/simple_server.py +++ b/examples/simple_server.py @@ -9,4 +9,5 @@ async def test(request): return json({"test": True}) -app.run(host="0.0.0.0", port=8000) +if __name__ == '__main__': + app.run(host="0.0.0.0", port=8000) diff --git a/examples/try_everything.py b/examples/try_everything.py index f7191ecc..da3cc515 100644 --- a/examples/try_everything.py +++ b/examples/try_everything.py @@ -70,6 +70,11 @@ def query_string(request): # Run Server # ----------------------------------------------- # +@app.listener('before_server_start') +def before_start(app, loop): + log.info("SERVER STARTING") + + @app.listener('after_server_start') def after_start(app, loop): log.info("OH OH OH OH OHHHHHHHH") @@ -77,7 +82,13 @@ def after_start(app, loop): @app.listener('before_server_stop') def before_stop(app, loop): + log.info("SERVER STOPPING") + + +@app.listener('after_server_stop') +def after_stop(app, loop): log.info("TRIED EVERYTHING") -app.run(host="0.0.0.0", port=8000, debug=True) +if __name__ == '__main__': + app.run(host="0.0.0.0", port=8000, debug=True) diff --git a/sanic/server.py b/sanic/server.py index 11601c00..ee45e81b 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -313,7 +313,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, after_start=None, before_stop=None, after_stop=None, debug=False, request_timeout=60, ssl=None, sock=None, request_max_size=None, reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, - register_sys_signals=True, run_async=False): + register_sys_signals=True, run_async=False, connections=None): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -349,7 +349,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, trigger_events(before_start, loop) - connections = set() + connections = connections or set() signal = Signal() server = partial( protocol, diff --git a/sanic/worker.py b/sanic/worker.py index e4906faf..cd1a6b9f 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -1,16 +1,29 @@ -import asyncio import os +import asyncio +import logging +try: + import ssl +except ImportError: + ssl = None import uvloop import gunicorn.workers.base as base +from sanic.server import trigger_events, serve, HttpProtocol +from sanic.websocket import WebSocketProtocol + class GunicornWorker(base.Worker): def __init__(self, *args, **kw): # pragma: no cover super().__init__(*args, **kw) + cfg = self.cfg + if cfg.is_ssl: + self.ssl_context = self._create_ssl_context(cfg) + else: + self.ssl_context = None self.servers = [] - self.connections = {} + self.connections = set() def init_process(self): # create new event_loop after fork @@ -28,20 +41,49 @@ class GunicornWorker(base.Worker): try: self.loop.run_until_complete(self._runner) finally: + trigger_events(self._server_settings.get('before_stop', []), self.loop) self.loop.close() + trigger_events(self._server_settings.get('after_stop', []), self.loop) async def close(self): - try: - if hasattr(self.wsgi, 'close'): - await self.wsgi.close() - except: - self.log.exception('Process shutdown exception') + if self.servers: + # stop accepting connections + self.log.info("Stopping server: %s, connections: %s", + self.pid, len(self.connections)) + for server in self.servers: + server.close() + await server.wait_closed() + self.servers.clear() + + # prepare connections for closing + for conn in self.connections: + conn.close_if_idle() + + while self.connections: + await asyncio.sleep(0.1) async def _run(self): + is_debug = self.log.loglevel == logging.DEBUG + protocol = (WebSocketProtocol if self.app.callable.websocket_enabled + else HttpProtocol) + self._server_settings = self.app.callable._helper( + host=None, + port=None, + loop=self.loop, + debug=is_debug, + protocol=protocol, + ssl=self.ssl_context, + run_async=True + ) + self._server_settings.pop('sock') for sock in self.sockets: - self.servers.append(await self.app.callable.create_server( - sock=sock, host=None, port=None, loop=self.loop)) + self.servers.append(await serve( + sock=sock, + connections=self.connections, + **self._server_settings + )) + trigger_events(self._server_settings.get('after_start', []), self.loop) # If our parent changed then we shut down. pid = os.getpid() try: @@ -56,8 +98,18 @@ class GunicornWorker(base.Worker): except (Exception, BaseException, GeneratorExit, KeyboardInterrupt): pass - if self.servers: - for server in self.servers: - server.close() - await self.close() + + @staticmethod + def _create_ssl_context(cfg): + """ Creates SSLContext instance for usage in asyncio.create_server. + See ssl.SSLSocket.__init__ for more details. + """ + ctx = ssl.SSLContext(cfg.ssl_version) + ctx.load_cert_chain(cfg.certfile, cfg.keyfile) + ctx.verify_mode = cfg.cert_reqs + if cfg.ca_certs: + ctx.load_verify_locations(cfg.ca_certs) + if cfg.ciphers: + ctx.set_ciphers(cfg.ciphers) + return ctx From f35442ad1b1e78007e5daf1c5172f3c90dea70b9 Mon Sep 17 00:00:00 2001 From: messense Date: Sun, 12 Mar 2017 23:06:44 +0800 Subject: [PATCH 03/11] Fix RuntimeError: this event loop is already running --- sanic/worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sanic/worker.py b/sanic/worker.py index cd1a6b9f..a54a5a11 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -36,10 +36,12 @@ class GunicornWorker(base.Worker): super().init_process() def run(self): - self._runner = asyncio.async(self._run(), loop=self.loop) + self._runner = asyncio.ensure_future(self._run(), loop=self.loop) try: self.loop.run_until_complete(self._runner) + trigger_events(self._server_settings.get('after_start', []), self.loop) + self.loop.run_until_complete(self._check_alive()) finally: trigger_events(self._server_settings.get('before_stop', []), self.loop) self.loop.close() @@ -83,7 +85,7 @@ class GunicornWorker(base.Worker): **self._server_settings )) - trigger_events(self._server_settings.get('after_start', []), self.loop) + async def _check_alive(self): # If our parent changed then we shut down. pid = os.getpid() try: From decd3e737c17c550c280be2cd75df9ca0e06c9c6 Mon Sep 17 00:00:00 2001 From: messense Date: Mon, 13 Mar 2017 17:41:47 +0800 Subject: [PATCH 04/11] Flake8 fix --- sanic/worker.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sanic/worker.py b/sanic/worker.py index a54a5a11..4e70f10d 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -40,12 +40,15 @@ class GunicornWorker(base.Worker): try: self.loop.run_until_complete(self._runner) - trigger_events(self._server_settings.get('after_start', []), self.loop) + trigger_events(self._server_settings.get('after_start', []), + self.loop) self.loop.run_until_complete(self._check_alive()) finally: - trigger_events(self._server_settings.get('before_stop', []), self.loop) + trigger_events(self._server_settings.get('before_stop', []), + self.loop) self.loop.close() - trigger_events(self._server_settings.get('after_stop', []), self.loop) + trigger_events(self._server_settings.get('after_stop', []), + self.loop) async def close(self): if self.servers: From 7ca9116e370d3a82dbb48c9d6aa4492cab44e6b5 Mon Sep 17 00:00:00 2001 From: messense Date: Tue, 14 Mar 2017 23:56:52 +0800 Subject: [PATCH 05/11] Trigger before_stop before closing server, after_stop before closing loop --- sanic/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sanic/worker.py b/sanic/worker.py index 4e70f10d..c0825ac7 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -44,13 +44,13 @@ class GunicornWorker(base.Worker): self.loop) self.loop.run_until_complete(self._check_alive()) finally: - trigger_events(self._server_settings.get('before_stop', []), - self.loop) - self.loop.close() trigger_events(self._server_settings.get('after_stop', []), self.loop) + self.loop.close() async def close(self): + trigger_events(self._server_settings.get('before_stop', []), + self.loop) if self.servers: # stop accepting connections self.log.info("Stopping server: %s, connections: %s", From 466b34735c02779ff9acbade8e278a589b39c108 Mon Sep 17 00:00:00 2001 From: messense Date: Wed, 15 Mar 2017 00:07:42 +0800 Subject: [PATCH 06/11] Set app.is_running to True --- sanic/worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sanic/worker.py b/sanic/worker.py index c0825ac7..3312b485 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -40,6 +40,7 @@ class GunicornWorker(base.Worker): try: self.loop.run_until_complete(self._runner) + self.app.callable.is_running = True trigger_events(self._server_settings.get('after_start', []), self.loop) self.loop.run_until_complete(self._check_alive()) From a90d70feae59f702e56e771cf84d12a6ee75beed Mon Sep 17 00:00:00 2001 From: messense Date: Wed, 15 Mar 2017 17:43:47 +0800 Subject: [PATCH 07/11] Check connections is not None --- sanic/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sanic/server.py b/sanic/server.py index ee45e81b..f044517e 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -349,7 +349,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, trigger_events(before_start, loop) - connections = connections or set() + connections = connections if connections is not None else set() signal = Signal() server = partial( protocol, From 11a3cf9b99063a58c5ff06c81db8888fef32cdc9 Mon Sep 17 00:00:00 2001 From: messense Date: Wed, 15 Mar 2017 18:01:52 +0800 Subject: [PATCH 08/11] Add signal handling --- sanic/worker.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/sanic/worker.py b/sanic/worker.py index 3312b485..10b7e19b 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -1,4 +1,6 @@ import os +import sys +import signal import asyncio import logging try: @@ -24,6 +26,7 @@ class GunicornWorker(base.Worker): self.ssl_context = None self.servers = [] self.connections = set() + self.exit_code = 0 def init_process(self): # create new event_loop after fork @@ -49,6 +52,8 @@ class GunicornWorker(base.Worker): self.loop) self.loop.close() + sys.exit(self.exit_code) + async def close(self): trigger_events(self._server_settings.get('before_stop', []), self.loop) @@ -119,3 +124,38 @@ class GunicornWorker(base.Worker): if cfg.ciphers: ctx.set_ciphers(cfg.ciphers) return ctx + + def init_signals(self): + # Set up signals through the event loop API. + + self.loop.add_signal_handler(signal.SIGQUIT, self.handle_quit, + signal.SIGQUIT, None) + + self.loop.add_signal_handler(signal.SIGTERM, self.handle_exit, + signal.SIGTERM, None) + + self.loop.add_signal_handler(signal.SIGINT, self.handle_quit, + signal.SIGINT, None) + + self.loop.add_signal_handler(signal.SIGWINCH, self.handle_winch, + signal.SIGWINCH, None) + + self.loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1, + signal.SIGUSR1, None) + + self.loop.add_signal_handler(signal.SIGABRT, self.handle_abort, + signal.SIGABRT, None) + + # Don't let SIGTERM and SIGUSR1 disturb active requests + # by interrupting system calls + signal.siginterrupt(signal.SIGTERM, False) + signal.siginterrupt(signal.SIGUSR1, False) + + def handle_quit(self, sig, frame): + self.alive = False + self.cfg.worker_int(self) + + def handle_abort(self, sig, frame): + self.alive = False + self.exit_code = 1 + self.cfg.worker_abort(self) From e27812bf3e843e001ea449d4ec588baf9d4d3d3c Mon Sep 17 00:00:00 2001 From: messense Date: Thu, 16 Mar 2017 11:55:10 +0800 Subject: [PATCH 09/11] Set `signal.stopped = True` on closing --- sanic/server.py | 4 ++-- sanic/worker.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sanic/server.py b/sanic/server.py index f044517e..00bb2331 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -313,7 +313,8 @@ def serve(host, port, request_handler, error_handler, before_start=None, after_start=None, before_stop=None, after_stop=None, debug=False, request_timeout=60, ssl=None, sock=None, request_max_size=None, reuse_port=False, loop=None, protocol=HttpProtocol, backlog=100, - register_sys_signals=True, run_async=False, connections=None): + register_sys_signals=True, run_async=False, connections=None, + signal=Signal()): """Start asynchronous HTTP Server on an individual process. :param host: Address to host on @@ -350,7 +351,6 @@ def serve(host, port, request_handler, error_handler, before_start=None, trigger_events(before_start, loop) connections = connections if connections is not None else set() - signal = Signal() server = partial( protocol, loop=loop, diff --git a/sanic/worker.py b/sanic/worker.py index 10b7e19b..2ed2672e 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -11,7 +11,7 @@ except ImportError: import uvloop import gunicorn.workers.base as base -from sanic.server import trigger_events, serve, HttpProtocol +from sanic.server import trigger_events, serve, HttpProtocol, Signal from sanic.websocket import WebSocketProtocol @@ -27,6 +27,7 @@ class GunicornWorker(base.Worker): self.servers = [] self.connections = set() self.exit_code = 0 + self.signal = Signal() def init_process(self): # create new event_loop after fork @@ -67,6 +68,7 @@ class GunicornWorker(base.Worker): self.servers.clear() # prepare connections for closing + self.signal.stopped = True for conn in self.connections: conn.close_if_idle() @@ -91,6 +93,7 @@ class GunicornWorker(base.Worker): self.servers.append(await serve( sock=sock, connections=self.connections, + signal=self.signal, **self._server_settings )) From d1fb5bdc300575197e77e8a63fb8b87264d03768 Mon Sep 17 00:00:00 2001 From: messense Date: Thu, 16 Mar 2017 17:47:01 +0800 Subject: [PATCH 10/11] Fix async before_server_start hook bug --- sanic/worker.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/sanic/worker.py b/sanic/worker.py index 2ed2672e..6fa9a25e 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -40,8 +40,24 @@ class GunicornWorker(base.Worker): super().init_process() def run(self): - self._runner = asyncio.ensure_future(self._run(), loop=self.loop) + is_debug = self.log.loglevel == logging.DEBUG + protocol = (WebSocketProtocol if self.app.callable.websocket_enabled + else HttpProtocol) + self._server_settings = self.app.callable._helper( + host=None, + port=None, + loop=self.loop, + debug=is_debug, + protocol=protocol, + ssl=self.ssl_context, + run_async=True + ) + self._server_settings.pop('sock') + trigger_events(self._server_settings.get('before_start', []), + self.loop) + self._server_settings['before_start'] = () + self._runner = asyncio.ensure_future(self._run(), loop=self.loop) try: self.loop.run_until_complete(self._runner) self.app.callable.is_running = True @@ -76,19 +92,6 @@ class GunicornWorker(base.Worker): await asyncio.sleep(0.1) async def _run(self): - is_debug = self.log.loglevel == logging.DEBUG - protocol = (WebSocketProtocol if self.app.callable.websocket_enabled - else HttpProtocol) - self._server_settings = self.app.callable._helper( - host=None, - port=None, - loop=self.loop, - debug=is_debug, - protocol=protocol, - ssl=self.ssl_context, - run_async=True - ) - self._server_settings.pop('sock') for sock in self.sockets: self.servers.append(await serve( sock=sock, From 1396ca903dfad87ed59d90564f97ac52844e05af Mon Sep 17 00:00:00 2001 From: messense Date: Mon, 20 Mar 2017 14:27:02 +0800 Subject: [PATCH 11/11] Fix before_stop event --- sanic/worker.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sanic/worker.py b/sanic/worker.py index 6fa9a25e..7a8303d8 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -64,6 +64,9 @@ class GunicornWorker(base.Worker): trigger_events(self._server_settings.get('after_start', []), self.loop) self.loop.run_until_complete(self._check_alive()) + trigger_events(self._server_settings.get('before_stop', []), + self.loop) + self.loop.run_until_complete(self.close()) finally: trigger_events(self._server_settings.get('after_stop', []), self.loop) @@ -72,8 +75,6 @@ class GunicornWorker(base.Worker): sys.exit(self.exit_code) async def close(self): - trigger_events(self._server_settings.get('before_stop', []), - self.loop) if self.servers: # stop accepting connections self.log.info("Stopping server: %s, connections: %s", @@ -115,8 +116,6 @@ class GunicornWorker(base.Worker): except (Exception, BaseException, GeneratorExit, KeyboardInterrupt): pass - await self.close() - @staticmethod def _create_ssl_context(cfg): """ Creates SSLContext instance for usage in asyncio.create_server.