From f32437bf1344cbc2ca2b0cbf3062ab04a93157bb Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Wed, 7 Dec 2022 14:42:17 +0200 Subject: [PATCH] Kill server early on worker error (#2610) --- sanic/exceptions.py | 4 + sanic/mixins/startup.py | 6 ++ sanic/worker/manager.py | 36 ++++++-- sanic/worker/multiplexer.py | 5 +- sanic/worker/serve.py | 145 ++++++++++++++++-------------- tests/test_unix_socket.py | 3 +- tests/worker/test_manager.py | 4 +- tests/worker/test_worker_serve.py | 12 ++- 8 files changed, 137 insertions(+), 78 deletions(-) diff --git a/sanic/exceptions.py b/sanic/exceptions.py index 1190e36e..8baac0a4 100644 --- a/sanic/exceptions.py +++ b/sanic/exceptions.py @@ -8,6 +8,10 @@ class RequestCancelled(CancelledError): quiet = True +class ServerKilled(Exception): + ... + + class SanicException(Exception): message: str = "" diff --git a/sanic/mixins/startup.py b/sanic/mixins/startup.py index ff220f35..78abb884 100644 --- a/sanic/mixins/startup.py +++ b/sanic/mixins/startup.py @@ -41,6 +41,7 @@ from sanic.application.motd import MOTD from sanic.application.state import ApplicationServerInfo, Mode, ServerStage from sanic.base.meta import SanicMeta from sanic.compat import OS_IS_WINDOWS, is_atty +from sanic.exceptions import ServerKilled from sanic.helpers import Default from sanic.http.constants import HTTP from sanic.http.tls import get_ssl_context, process_to_context @@ -740,6 +741,7 @@ class StartupMixin(metaclass=SanicMeta): socks = [] sync_manager = Manager() setup_ext(primary) + exit_code = 0 try: primary_server_info.settings.pop("main_start", None) primary_server_info.settings.pop("main_stop", None) @@ -849,6 +851,8 @@ class StartupMixin(metaclass=SanicMeta): trigger_events(ready, loop, primary) manager.run() + except ServerKilled: + exit_code = 1 except BaseException: kwargs = primary_server_info.settings error_logger.exception( @@ -874,6 +878,8 @@ class StartupMixin(metaclass=SanicMeta): unix = kwargs.get("unix") if unix: remove_unix_socket(unix) + if exit_code: + os._exit(exit_code) @classmethod def serve_single(cls, primary: Optional[Sanic] = None) -> None: diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index adb7237a..ad77a138 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -1,12 +1,11 @@ import os -import sys from signal import SIGINT, SIGTERM, Signals from signal import signal as signal_func -from time import sleep from typing import List, Optional from sanic.compat import OS_IS_WINDOWS +from sanic.exceptions import ServerKilled from sanic.log import error_logger, logger from sanic.worker.process import ProcessState, Worker, WorkerProcess @@ -18,7 +17,7 @@ else: class WorkerManager: - THRESHOLD = 50 + THRESHOLD = 300 # == 30 seconds def __init__( self, @@ -130,13 +129,36 @@ class WorkerManager: def wait_for_ack(self): # no cov misses = 0 + message = ( + "It seems that one or more of your workers failed to come " + "online in the allowed time. Sanic is shutting down to avoid a " + f"deadlock. The current threshold is {self.THRESHOLD / 10}s. " + "If this problem persists, please check out the documentation " + "___." + ) while not self._all_workers_ack(): - sleep(0.1) + if self.monitor_subscriber.poll(0.1): + monitor_msg = self.monitor_subscriber.recv() + if monitor_msg != "__TERMINATE_EARLY__": + self.monitor_publisher.send(monitor_msg) + continue + misses = self.THRESHOLD + message = ( + "One of your worker processes terminated before startup " + "was completed. Please solve any errors experienced " + "during startup. If you do not see an exception traceback " + "in your error logs, try running Sanic in in a single " + "process using --single-process or single_process=True. " + "Once you are confident that the server is able to start " + "without errors you can switch back to multiprocess mode." + ) misses += 1 if misses > self.THRESHOLD: - error_logger.error("Not all workers are ack. Shutting down.") + error_logger.error( + "Not all workers acknowledged a successful startup. " + "Shutting down.\n\n" + message + ) self.kill() - sys.exit(1) @property def workers(self): @@ -156,7 +178,9 @@ class WorkerManager: def kill(self): for process in self.processes: + logger.info("Killing %s [%s]", process.name, process.pid) os.kill(process.pid, SIGKILL) + raise ServerKilled def shutdown_signal(self, signal, frame): logger.info("Received signal %s. Shutting down.", Signals(signal).name) diff --git a/sanic/worker/multiplexer.py b/sanic/worker/multiplexer.py index 25c7836f..f92e7e08 100644 --- a/sanic/worker/multiplexer.py +++ b/sanic/worker/multiplexer.py @@ -28,8 +28,9 @@ class WorkerMultiplexer: reload = restart # no cov - def terminate(self): - self._monitor_publisher.send("__TERMINATE__") + def terminate(self, early: bool = False): + message = "__TERMINATE_EARLY__" if early else "__TERMINATE__" + self._monitor_publisher.send(message) @property def pid(self) -> int: diff --git a/sanic/worker/serve.py b/sanic/worker/serve.py index a277824c..8d233069 100644 --- a/sanic/worker/serve.py +++ b/sanic/worker/serve.py @@ -1,6 +1,7 @@ import asyncio import os import socket +import warnings from functools import partial from multiprocessing.connection import Connection @@ -10,6 +11,7 @@ from typing import Any, Dict, List, Optional, Type, Union from sanic.application.constants import ServerStage from sanic.application.state import ApplicationServerInfo from sanic.http.constants import HTTP +from sanic.log import error_logger from sanic.models.server_types import Signal from sanic.server.protocols.http_protocol import HttpProtocol from sanic.server.runners import _serve_http_1, _serve_http_3 @@ -45,80 +47,93 @@ def worker_serve( config=None, passthru: Optional[Dict[str, Any]] = None, ): - from sanic import Sanic + try: + from sanic import Sanic - if app_loader: - app = app_loader.load() - else: - app = Sanic.get_app(app_name) + if app_loader: + app = app_loader.load() + else: + app = Sanic.get_app(app_name) - app.refresh(passthru) - app.setup_loop() + app.refresh(passthru) + app.setup_loop() - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) - # Hydrate server info if needed - if server_info: - for app_name, server_info_objects in server_info.items(): - a = Sanic.get_app(app_name) - if not a.state.server_info: - a.state.server_info = [] - for info in server_info_objects: - if not info.settings.get("app"): - info.settings["app"] = a - a.state.server_info.append(info) + # Hydrate server info if needed + if server_info: + for app_name, server_info_objects in server_info.items(): + a = Sanic.get_app(app_name) + if not a.state.server_info: + a.state.server_info = [] + for info in server_info_objects: + if not info.settings.get("app"): + info.settings["app"] = a + a.state.server_info.append(info) - if isinstance(ssl, dict): - cert_loader = CertLoader(ssl) - ssl = cert_loader.load(app) - for info in app.state.server_info: - info.settings["ssl"] = ssl + if isinstance(ssl, dict): + cert_loader = CertLoader(ssl) + ssl = cert_loader.load(app) + for info in app.state.server_info: + info.settings["ssl"] = ssl - # When in a worker process, do some init - if os.environ.get("SANIC_WORKER_NAME"): - # Hydrate apps with any passed server info + # When in a worker process, do some init + if os.environ.get("SANIC_WORKER_NAME"): + # Hydrate apps with any passed server info - if monitor_publisher is None: - raise RuntimeError("No restart publisher found in worker process") - if worker_state is None: - raise RuntimeError("No worker state found in worker process") + if monitor_publisher is None: + raise RuntimeError( + "No restart publisher found in worker process" + ) + if worker_state is None: + raise RuntimeError("No worker state found in worker process") - # Run secondary servers - apps = list(Sanic._app_registry.values()) - app.before_server_start(partial(app._start_servers, apps=apps)) - for a in apps: - a.multiplexer = WorkerMultiplexer(monitor_publisher, worker_state) + # Run secondary servers + apps = list(Sanic._app_registry.values()) + app.before_server_start(partial(app._start_servers, apps=apps)) + for a in apps: + a.multiplexer = WorkerMultiplexer( + monitor_publisher, worker_state + ) - if app.debug: - loop.set_debug(app.debug) + if app.debug: + loop.set_debug(app.debug) - app.asgi = False + app.asgi = False - if app.state.server_info: - primary_server_info = app.state.server_info[0] - primary_server_info.stage = ServerStage.SERVING - if config: - app.update_config(config) + if app.state.server_info: + primary_server_info = app.state.server_info[0] + primary_server_info.stage = ServerStage.SERVING + if config: + app.update_config(config) - if version is HTTP.VERSION_3: - return _serve_http_3(host, port, app, loop, ssl) - return _serve_http_1( - host, - port, - app, - ssl, - sock, - unix, - reuse_port, - loop, - protocol, - backlog, - register_sys_signals, - run_multiple, - run_async, - connections, - signal, - state, - asyncio_server_kwargs, - ) + if version is HTTP.VERSION_3: + return _serve_http_3(host, port, app, loop, ssl) + return _serve_http_1( + host, + port, + app, + ssl, + sock, + unix, + reuse_port, + loop, + protocol, + backlog, + register_sys_signals, + run_multiple, + run_async, + connections, + signal, + state, + asyncio_server_kwargs, + ) + except Exception as e: + warnings.simplefilter("ignore", category=RuntimeWarning) + if monitor_publisher: + error_logger.exception(e) + multiplexer = WorkerMultiplexer(monitor_publisher, {}) + multiplexer.terminate(True) + else: + raise e diff --git a/tests/test_unix_socket.py b/tests/test_unix_socket.py index 515c30df..adb80b9c 100644 --- a/tests/test_unix_socket.py +++ b/tests/test_unix_socket.py @@ -2,7 +2,7 @@ import logging import os -from asyncio import AbstractEventLoop +from asyncio import AbstractEventLoop, sleep from string import ascii_lowercase import httpcore @@ -179,6 +179,7 @@ async def client(app: Sanic, loop: AbstractEventLoop): assert r.status_code == 200 assert r.text == os.path.abspath(SOCKPATH) finally: + await sleep(0.2) app.stop() diff --git a/tests/worker/test_manager.py b/tests/worker/test_manager.py index f7be778f..c377b260 100644 --- a/tests/worker/test_manager.py +++ b/tests/worker/test_manager.py @@ -3,6 +3,7 @@ from unittest.mock import Mock, call, patch import pytest +from sanic.exceptions import ServerKilled from sanic.worker.manager import WorkerManager @@ -76,7 +77,8 @@ def test_kill(os_mock: Mock): (Mock(), Mock()), {}, ) - manager.kill() + with pytest.raises(ServerKilled): + manager.kill() os_mock.kill.assert_called_once_with(1234, SIGKILL) diff --git a/tests/worker/test_worker_serve.py b/tests/worker/test_worker_serve.py index bea7ffea..b24f7e8f 100644 --- a/tests/worker/test_worker_serve.py +++ b/tests/worker/test_worker_serve.py @@ -1,3 +1,5 @@ +import logging + from os import environ from unittest.mock import Mock, patch @@ -37,7 +39,7 @@ def test_config_app(mock_app: Mock): mock_app.update_config.assert_called_once_with({"FOO": "BAR"}) -def test_bad_process(mock_app: Mock): +def test_bad_process(mock_app: Mock, caplog): environ["SANIC_WORKER_NAME"] = "FOO" message = "No restart publisher found in worker process" @@ -45,8 +47,12 @@ def test_bad_process(mock_app: Mock): worker_serve(**args(mock_app)) message = "No worker state found in worker process" - with pytest.raises(RuntimeError, match=message): - worker_serve(**args(mock_app, monitor_publisher=Mock())) + publisher = Mock() + with caplog.at_level(logging.ERROR): + worker_serve(**args(mock_app, monitor_publisher=publisher)) + + assert ("sanic.error", logging.ERROR, message) in caplog.record_tuples + publisher.send.assert_called_once_with("__TERMINATE_EARLY__") del environ["SANIC_WORKER_NAME"]