Kill server early on worker error (#2610)

This commit is contained in:
Adam Hopkins 2022-12-07 14:42:17 +02:00 committed by GitHub
parent 0909e94527
commit f32437bf13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 137 additions and 78 deletions

View File

@ -8,6 +8,10 @@ class RequestCancelled(CancelledError):
quiet = True
class ServerKilled(Exception):
...
class SanicException(Exception):
message: str = ""

View File

@ -41,6 +41,7 @@ from sanic.application.motd import MOTD
from sanic.application.state import ApplicationServerInfo, Mode, ServerStage
from sanic.base.meta import SanicMeta
from sanic.compat import OS_IS_WINDOWS, is_atty
from sanic.exceptions import ServerKilled
from sanic.helpers import Default
from sanic.http.constants import HTTP
from sanic.http.tls import get_ssl_context, process_to_context
@ -740,6 +741,7 @@ class StartupMixin(metaclass=SanicMeta):
socks = []
sync_manager = Manager()
setup_ext(primary)
exit_code = 0
try:
primary_server_info.settings.pop("main_start", None)
primary_server_info.settings.pop("main_stop", None)
@ -849,6 +851,8 @@ class StartupMixin(metaclass=SanicMeta):
trigger_events(ready, loop, primary)
manager.run()
except ServerKilled:
exit_code = 1
except BaseException:
kwargs = primary_server_info.settings
error_logger.exception(
@ -874,6 +878,8 @@ class StartupMixin(metaclass=SanicMeta):
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
if exit_code:
os._exit(exit_code)
@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:

View File

@ -1,12 +1,11 @@
import os
import sys
from signal import SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from time import sleep
from typing import List, Optional
from sanic.compat import OS_IS_WINDOWS
from sanic.exceptions import ServerKilled
from sanic.log import error_logger, logger
from sanic.worker.process import ProcessState, Worker, WorkerProcess
@ -18,7 +17,7 @@ else:
class WorkerManager:
THRESHOLD = 50
THRESHOLD = 300 # == 30 seconds
def __init__(
self,
@ -130,13 +129,36 @@ class WorkerManager:
def wait_for_ack(self): # no cov
misses = 0
message = (
"It seems that one or more of your workers failed to come "
"online in the allowed time. Sanic is shutting down to avoid a "
f"deadlock. The current threshold is {self.THRESHOLD / 10}s. "
"If this problem persists, please check out the documentation "
"___."
)
while not self._all_workers_ack():
sleep(0.1)
if self.monitor_subscriber.poll(0.1):
monitor_msg = self.monitor_subscriber.recv()
if monitor_msg != "__TERMINATE_EARLY__":
self.monitor_publisher.send(monitor_msg)
continue
misses = self.THRESHOLD
message = (
"One of your worker processes terminated before startup "
"was completed. Please solve any errors experienced "
"during startup. If you do not see an exception traceback "
"in your error logs, try running Sanic in in a single "
"process using --single-process or single_process=True. "
"Once you are confident that the server is able to start "
"without errors you can switch back to multiprocess mode."
)
misses += 1
if misses > self.THRESHOLD:
error_logger.error("Not all workers are ack. Shutting down.")
error_logger.error(
"Not all workers acknowledged a successful startup. "
"Shutting down.\n\n" + message
)
self.kill()
sys.exit(1)
@property
def workers(self):
@ -156,7 +178,9 @@ class WorkerManager:
def kill(self):
for process in self.processes:
logger.info("Killing %s [%s]", process.name, process.pid)
os.kill(process.pid, SIGKILL)
raise ServerKilled
def shutdown_signal(self, signal, frame):
logger.info("Received signal %s. Shutting down.", Signals(signal).name)

View File

@ -28,8 +28,9 @@ class WorkerMultiplexer:
reload = restart # no cov
def terminate(self):
self._monitor_publisher.send("__TERMINATE__")
def terminate(self, early: bool = False):
message = "__TERMINATE_EARLY__" if early else "__TERMINATE__"
self._monitor_publisher.send(message)
@property
def pid(self) -> int:

View File

@ -1,6 +1,7 @@
import asyncio
import os
import socket
import warnings
from functools import partial
from multiprocessing.connection import Connection
@ -10,6 +11,7 @@ from typing import Any, Dict, List, Optional, Type, Union
from sanic.application.constants import ServerStage
from sanic.application.state import ApplicationServerInfo
from sanic.http.constants import HTTP
from sanic.log import error_logger
from sanic.models.server_types import Signal
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.runners import _serve_http_1, _serve_http_3
@ -45,80 +47,93 @@ def worker_serve(
config=None,
passthru: Optional[Dict[str, Any]] = None,
):
from sanic import Sanic
try:
from sanic import Sanic
if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)
if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)
app.refresh(passthru)
app.setup_loop()
app.refresh(passthru)
app.setup_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)
# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)
if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl
if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl
# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info
# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info
if monitor_publisher is None:
raise RuntimeError("No restart publisher found in worker process")
if worker_state is None:
raise RuntimeError("No worker state found in worker process")
if monitor_publisher is None:
raise RuntimeError(
"No restart publisher found in worker process"
)
if worker_state is None:
raise RuntimeError("No worker state found in worker process")
# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(monitor_publisher, worker_state)
# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(
monitor_publisher, worker_state
)
if app.debug:
loop.set_debug(app.debug)
if app.debug:
loop.set_debug(app.debug)
app.asgi = False
app.asgi = False
if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)
if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)
if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
except Exception as e:
warnings.simplefilter("ignore", category=RuntimeWarning)
if monitor_publisher:
error_logger.exception(e)
multiplexer = WorkerMultiplexer(monitor_publisher, {})
multiplexer.terminate(True)
else:
raise e

View File

@ -2,7 +2,7 @@
import logging
import os
from asyncio import AbstractEventLoop
from asyncio import AbstractEventLoop, sleep
from string import ascii_lowercase
import httpcore
@ -179,6 +179,7 @@ async def client(app: Sanic, loop: AbstractEventLoop):
assert r.status_code == 200
assert r.text == os.path.abspath(SOCKPATH)
finally:
await sleep(0.2)
app.stop()

View File

@ -3,6 +3,7 @@ from unittest.mock import Mock, call, patch
import pytest
from sanic.exceptions import ServerKilled
from sanic.worker.manager import WorkerManager
@ -76,7 +77,8 @@ def test_kill(os_mock: Mock):
(Mock(), Mock()),
{},
)
manager.kill()
with pytest.raises(ServerKilled):
manager.kill()
os_mock.kill.assert_called_once_with(1234, SIGKILL)

View File

@ -1,3 +1,5 @@
import logging
from os import environ
from unittest.mock import Mock, patch
@ -37,7 +39,7 @@ def test_config_app(mock_app: Mock):
mock_app.update_config.assert_called_once_with({"FOO": "BAR"})
def test_bad_process(mock_app: Mock):
def test_bad_process(mock_app: Mock, caplog):
environ["SANIC_WORKER_NAME"] = "FOO"
message = "No restart publisher found in worker process"
@ -45,8 +47,12 @@ def test_bad_process(mock_app: Mock):
worker_serve(**args(mock_app))
message = "No worker state found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app, monitor_publisher=Mock()))
publisher = Mock()
with caplog.at_level(logging.ERROR):
worker_serve(**args(mock_app, monitor_publisher=publisher))
assert ("sanic.error", logging.ERROR, message) in caplog.record_tuples
publisher.send.assert_called_once_with("__TERMINATE_EARLY__")
del environ["SANIC_WORKER_NAME"]