From e0839fe1304052ac40ee33981815d35ea9f6ebd0 Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Tue, 29 Aug 2023 19:17:50 +0300 Subject: [PATCH] Cleaner process management --- sanic/app.py | 16 ++++++++++++++-- sanic/mixins/startup.py | 25 ++++++++++++++++++++++--- sanic/server/runners.py | 16 ++++++++-------- sanic/worker/manager.py | 6 ++++++ sanic/worker/multiplexer.py | 18 ++++++++++++++++++ sanic/worker/process.py | 25 ++++++++++++++++++++++++- 6 files changed, 92 insertions(+), 14 deletions(-) diff --git a/sanic/app.py b/sanic/app.py index 57b68fc4..c378e505 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -5,7 +5,6 @@ import logging import logging.config import re import sys - from asyncio import ( AbstractEventLoop, CancelledError, @@ -94,7 +93,6 @@ from sanic.worker.inspector import Inspector from sanic.worker.loader import CertLoader from sanic.worker.manager import WorkerManager - if TYPE_CHECKING: try: from sanic_ext import Extend # type: ignore @@ -1743,6 +1741,20 @@ class Sanic( if hasattr(self, "multiplexer"): self.multiplexer.ack() + def set_serving(self, serving: bool) -> None: + """Set the serving state of the application. + + This method is used to set the serving state of the application. + It is used internally by Sanic and should not typically be called + manually. + + Args: + serving (bool): Whether the application is serving. + """ + self.state.is_running = serving + if hasattr(self, "multiplexer"): + self.multiplexer.set_serving(serving) + async def _server_event( self, concern: str, diff --git a/sanic/mixins/startup.py b/sanic/mixins/startup.py index f6b5630e..c676f205 100644 --- a/sanic/mixins/startup.py +++ b/sanic/mixins/startup.py @@ -3,7 +3,6 @@ from __future__ import annotations import os import platform import sys - from asyncio import ( AbstractEventLoop, CancelledError, @@ -65,13 +64,13 @@ from sanic.server.protocols.http_protocol import HttpProtocol from sanic.server.protocols.websocket_protocol import WebSocketProtocol from sanic.server.runners import serve from sanic.server.socket import configure_socket, remove_unix_socket +from sanic.worker.constants import ProcessState from sanic.worker.loader import AppLoader from sanic.worker.manager import WorkerManager from sanic.worker.multiplexer import WorkerMultiplexer from sanic.worker.reloader import Reloader from sanic.worker.serve import worker_serve - if TYPE_CHECKING: from sanic import Sanic from sanic.application.state import ApplicationState @@ -893,7 +892,6 @@ class StartupMixin(metaclass=SanicMeta): app.router.reset() app.signal_router.reset() - sync_manager.shutdown() for sock in socks: try: sock.shutdown(SHUT_RDWR) @@ -905,12 +903,33 @@ class StartupMixin(metaclass=SanicMeta): loop.close() cls._cleanup_env_vars() cls._cleanup_apps() + + from time import sleep + + limit = 100 + while cls._get_process_states(worker_state): + sleep(0.1) + limit -= 1 + if limit <= 0: + error_logger.warning( + "Worker shutdown timed out. " + "Some processes may still be running." + ) + break + sync_manager.shutdown() unix = kwargs.get("unix") if unix: remove_unix_socket(unix) + logger.info("Goodbye.") if exit_code: os._exit(exit_code) + @staticmethod + def _get_process_states(worker_state) -> List[str]: + return [ + state for s in worker_state.values() if (state := s.get("state")) + ] + @classmethod def serve_single(cls, primary: Optional[Sanic] = None) -> None: os.environ["SANIC_MOTD_OUTPUT"] = "true" diff --git a/sanic/server/runners.py b/sanic/server/runners.py index e56e8f37..0be965de 100644 --- a/sanic/server/runners.py +++ b/sanic/server/runners.py @@ -122,17 +122,15 @@ def _setup_system_signals( register_sys_signals: bool, loop: asyncio.AbstractEventLoop, ) -> None: # no cov - # Ignore SIGINT when run_multiple - if run_multiple: - signal_func(SIGINT, SIG_IGN) - os.environ["SANIC_WORKER_PROCESS"] = "true" - + signal_func(SIGINT, SIG_IGN) + signal_func(SIGTERM, SIG_IGN) + os.environ["SANIC_WORKER_PROCESS"] = "true" # Register signals for graceful termination if register_sys_signals: if OS_IS_WINDOWS: ctrlc_workaround_for_windows(app) else: - for _signal in [SIGTERM] if run_multiple else [SIGINT, SIGTERM]: + for _signal in [SIGINT, SIGTERM]: loop.add_signal_handler( _signal, partial(app.stop, terminate=False) ) @@ -143,8 +141,6 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix): try: server_logger.info("Starting worker [%s]", pid) loop.run_forever() - except KeyboardInterrupt: - pass finally: server_logger.info("Stopping worker [%s]", pid) @@ -156,6 +152,7 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix): loop.run_until_complete(after_stop()) remove_unix_socket(unix) loop.close() + server_logger.info("Worker complete [%s]", pid) def _serve_http_1( @@ -259,8 +256,11 @@ def _serve_http_1( else: conn.abort() + app.set_serving(False) + _setup_system_signals(app, run_multiple, register_sys_signals, loop) loop.run_until_complete(app._server_event("init", "after")) + app.set_serving(True) _run_server_forever( loop, partial(app._server_event, "shutdown", "before"), diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index abdaa5cf..c0580d8e 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -122,6 +122,7 @@ class WorkerManager: self.monitor() self.join() self.terminate() + self.cleanup() def start(self): for process in self.processes: @@ -147,6 +148,11 @@ class WorkerManager: for process in self.processes: process.terminate() + def cleanup(self): + """Cleanup the worker processes.""" + for process in self.processes: + process.exit() + def restart( self, process_names: Optional[List[str]] = None, diff --git a/sanic/worker/multiplexer.py b/sanic/worker/multiplexer.py index 7bd956e4..323802c6 100644 --- a/sanic/worker/multiplexer.py +++ b/sanic/worker/multiplexer.py @@ -28,6 +28,24 @@ class WorkerMultiplexer: "state": ProcessState.ACKED.name, } + def set_serving(self, serving: bool) -> None: + """Set the worker to serving. + + Args: + serving (bool): Whether the worker is serving. + """ + self._state._state[self.name] = { + **self._state._state[self.name], + "serving": serving, + } + + def exit(self): + """Run cleanup at worker exit.""" + try: + del self._state._state[self.name] + except ConnectionRefusedError: + logger.debug("Monitor process has already exited.") + def restart( self, name: str = "", diff --git a/sanic/worker/process.py b/sanic/worker/process.py index d186456c..b93ebcf4 100644 --- a/sanic/worker/process.py +++ b/sanic/worker/process.py @@ -65,6 +65,20 @@ class WorkerProcess: self.set_state(ProcessState.JOINED) self._current_process.join() + def exit(self): + limit = 100 + while self.is_alive() and limit > 0: + sleep(0.1) + limit -= 1 + + if not self.is_alive(): + try: + del self.worker_state[self.name] + except ConnectionRefusedError: + logger.debug("Monitor process has already exited.") + except KeyError: + logger.debug("Could not find worker state to delete.") + def terminate(self): if self.state is not ProcessState.TERMINATED: logger.debug( @@ -77,7 +91,6 @@ class WorkerProcess: self.set_state(ProcessState.TERMINATED, force=True) try: os.kill(self.pid, SIGINT) - del self.worker_state[self.name] except (KeyError, AttributeError, ProcessLookupError): ... @@ -118,6 +131,16 @@ class WorkerProcess: except AssertionError: return False + # def _run(self, **kwargs): + # atexit.register(self._exit) + # self.target(**kwargs) + + # def _exit(self): + # try: + # del self.worker_state[self.name] + # except ConnectionRefusedError: + # logger.debug("Monitor process has already exited.") + def spawn(self): if self.state not in (ProcessState.IDLE, ProcessState.RESTARTING): raise Exception("Cannot spawn a worker process until it is idle.")