From 4b968dc61178c4782644d2a0573cdf94188f0971 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Mon, 1 Mar 2021 15:03:26 +0200 Subject: [PATCH] Listeners for main server process (#2018) * Initial POC * Add test case * Resolve create_server and Gunicorn serve) * add testr coverage --- sanic/app.py | 12 +++++++++++- sanic/mixins/listeners.py | 2 ++ sanic/server.py | 30 ++++++++++++++++++++++++++++-- sanic/worker.py | 10 ++++++++++ tests/test_app.py | 14 ++++++++++++++ tests/test_multiprocessing.py | 34 ++++++++++++++++++++++++++++++++++ 6 files changed, 99 insertions(+), 3 deletions(-) diff --git a/sanic/app.py b/sanic/app.py index 998c49b8..5dc2efeb 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -59,6 +59,7 @@ from sanic.server import ( Signal, serve, serve_multiple, + serve_single, ) from sanic.websocket import ConnectionClosed, WebSocketProtocol @@ -817,7 +818,7 @@ class Sanic(BaseSanic): ) workers = 1 if workers == 1: - serve(**server_settings) + serve_single(server_settings) else: serve_multiple(server_settings, workers) except BaseException: @@ -920,6 +921,13 @@ class Sanic(BaseSanic): server_settings.get("before_start", []), server_settings.get("loop"), ) + main_start = server_settings.pop("main_start", None) + main_stop = server_settings.pop("main_stop", None) + if main_start or main_stop: + logger.warning( + "Listener events for the main process are not available " + "with create_server()" + ) return await serve( asyncio_server_kwargs=asyncio_server_kwargs, **server_settings @@ -1038,6 +1046,8 @@ class Sanic(BaseSanic): ("after_server_start", "after_start", False), ("before_server_stop", "before_stop", True), ("after_server_stop", "after_stop", True), + ("main_process_start", "main_start", False), + ("main_process_stop", "main_stop", True), ): listeners = self.listeners[event_name].copy() if reverse: diff --git a/sanic/mixins/listeners.py b/sanic/mixins/listeners.py index 95645fbd..94e09196 100644 --- a/sanic/mixins/listeners.py +++ b/sanic/mixins/listeners.py @@ -13,6 +13,8 @@ class ListenerEvent(str, Enum): AFTER_SERVER_START = auto() BEFORE_SERVER_STOP = auto() AFTER_SERVER_STOP = auto() + MAIN_PROCESS_START = auto() + MAIN_PROCESS_STOP = auto() class ListenerMixin: diff --git a/sanic/server.py b/sanic/server.py index 82869208..0e929226 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -490,7 +490,7 @@ def serve( create_server method :return: Nothing """ - if not run_async: + if not run_async and not loop: # create new event_loop after fork loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -603,7 +603,6 @@ def serve( trigger_events(after_stop, loop) - loop.close() remove_unix_socket(unix) @@ -700,6 +699,23 @@ def remove_unix_socket(path: Optional[str]) -> None: pass +def serve_single(server_settings): + main_start = server_settings.pop("main_start", None) + main_stop = server_settings.pop("main_stop", None) + + if not server_settings.get("run_async"): + # create new event_loop after fork + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + server_settings["loop"] = loop + + trigger_events(main_start, server_settings["loop"]) + serve(**server_settings) + trigger_events(main_stop, server_settings["loop"]) + + server_settings["loop"].close() + + def serve_multiple(server_settings, workers): """Start multiple server processes simultaneously. Stop on interrupt and terminate signals, and drain connections when complete. @@ -712,6 +728,13 @@ def serve_multiple(server_settings, workers): server_settings["reuse_port"] = True server_settings["run_multiple"] = True + main_start = server_settings.pop("main_start", None) + main_stop = server_settings.pop("main_stop", None) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + trigger_events(main_start, loop) + # Create a listening socket or use the one in settings sock = server_settings.get("sock") unix = server_settings["unix"] @@ -752,5 +775,8 @@ def serve_multiple(server_settings, workers): for process in processes: process.terminate() + trigger_events(main_stop, loop) + sock.close() + loop.close() remove_unix_socket(unix) diff --git a/sanic/worker.py b/sanic/worker.py index 765f26f7..169b875c 100644 --- a/sanic/worker.py +++ b/sanic/worker.py @@ -7,6 +7,7 @@ import traceback from gunicorn.workers import base as base # type: ignore +from sanic.log import logger from sanic.server import HttpProtocol, Signal, serve, trigger_events from sanic.websocket import WebSocketProtocol @@ -72,6 +73,15 @@ class GunicornWorker(base.Worker): ) self._server_settings["before_start"] = () + main_start = self._server_settings.pop("main_start", None) + main_stop = self._server_settings.pop("main_stop", None) + + if main_start or main_stop: # noqa + logger.warning( + "Listener events for the main process are not available " + "with GunicornWorker" + ) + self._runner = asyncio.ensure_future(self._run(), loop=self.loop) try: self.loop.run_until_complete(self._runner) diff --git a/tests/test_app.py b/tests/test_app.py index b5810000..6ed0293e 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -70,6 +70,20 @@ def test_asyncio_server_start_serving(app): # Looks like we can't easily test `serve_forever()` +def test_create_server_main(app, caplog): + app.listener("main_process_start")(lambda *_: ...) + loop = asyncio.get_event_loop() + with caplog.at_level(logging.INFO): + asyncio_srv_coro = app.create_server(return_asyncio_server=True) + loop.run_until_complete(asyncio_srv_coro) + assert ( + "sanic.root", + 30, + "Listener events for the main process are not available with " + "create_server()", + ) in caplog.record_tuples + + def test_app_loop_not_running(app): with pytest.raises(SanicException) as excinfo: app.loop diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index 25f5eeac..ec29b3ae 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -1,3 +1,4 @@ +import logging import multiprocessing import pickle import random @@ -8,6 +9,7 @@ import pytest from sanic_testing.testing import HOST, PORT from sanic import Blueprint +from sanic.log import logger from sanic.response import text @@ -108,3 +110,35 @@ def test_pickle_app_with_static(app, protocol): assert up_p_app request, response = up_p_app.test_client.get("/static/missing.txt") assert response.status == 404 + + +def test_main_process_event(app, caplog): + # Selects a number at random so we can spot check + num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1)) + + def stop_on_alarm(*args): + for process in multiprocessing.active_children(): + process.terminate() + + signal.signal(signal.SIGALRM, stop_on_alarm) + signal.alarm(1) + + @app.listener("main_process_start") + def main_process_start(app, loop): + logger.info("main_process_start") + + @app.listener("main_process_stop") + def main_process_stop(app, loop): + logger.info("main_process_stop") + + with caplog.at_level(logging.INFO): + app.run(HOST, PORT, workers=num_workers) + + assert ( + caplog.record_tuples.count(("sanic.root", 20, "main_process_start")) + == 1 + ) + assert ( + caplog.record_tuples.count(("sanic.root", 20, "main_process_stop")) + == 1 + )