Cleaner process management

This commit is contained in:
Adam Hopkins 2023-08-29 19:17:50 +03:00
parent 31d14704cb
commit e0839fe130
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
6 changed files with 92 additions and 14 deletions

View File

@ -5,7 +5,6 @@ import logging
import logging.config import logging.config
import re import re
import sys import sys
from asyncio import ( from asyncio import (
AbstractEventLoop, AbstractEventLoop,
CancelledError, CancelledError,
@ -94,7 +93,6 @@ from sanic.worker.inspector import Inspector
from sanic.worker.loader import CertLoader from sanic.worker.loader import CertLoader
from sanic.worker.manager import WorkerManager from sanic.worker.manager import WorkerManager
if TYPE_CHECKING: if TYPE_CHECKING:
try: try:
from sanic_ext import Extend # type: ignore from sanic_ext import Extend # type: ignore
@ -1743,6 +1741,20 @@ class Sanic(
if hasattr(self, "multiplexer"): if hasattr(self, "multiplexer"):
self.multiplexer.ack() self.multiplexer.ack()
def set_serving(self, serving: bool) -> None:
"""Set the serving state of the application.
This method is used to set the serving state of the application.
It is used internally by Sanic and should not typically be called
manually.
Args:
serving (bool): Whether the application is serving.
"""
self.state.is_running = serving
if hasattr(self, "multiplexer"):
self.multiplexer.set_serving(serving)
async def _server_event( async def _server_event(
self, self,
concern: str, concern: str,

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import os import os
import platform import platform
import sys import sys
from asyncio import ( from asyncio import (
AbstractEventLoop, AbstractEventLoop,
CancelledError, CancelledError,
@ -65,13 +64,13 @@ from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.protocols.websocket_protocol import WebSocketProtocol from sanic.server.protocols.websocket_protocol import WebSocketProtocol
from sanic.server.runners import serve from sanic.server.runners import serve
from sanic.server.socket import configure_socket, remove_unix_socket from sanic.server.socket import configure_socket, remove_unix_socket
from sanic.worker.constants import ProcessState
from sanic.worker.loader import AppLoader from sanic.worker.loader import AppLoader
from sanic.worker.manager import WorkerManager from sanic.worker.manager import WorkerManager
from sanic.worker.multiplexer import WorkerMultiplexer from sanic.worker.multiplexer import WorkerMultiplexer
from sanic.worker.reloader import Reloader from sanic.worker.reloader import Reloader
from sanic.worker.serve import worker_serve from sanic.worker.serve import worker_serve
if TYPE_CHECKING: if TYPE_CHECKING:
from sanic import Sanic from sanic import Sanic
from sanic.application.state import ApplicationState from sanic.application.state import ApplicationState
@ -893,7 +892,6 @@ class StartupMixin(metaclass=SanicMeta):
app.router.reset() app.router.reset()
app.signal_router.reset() app.signal_router.reset()
sync_manager.shutdown()
for sock in socks: for sock in socks:
try: try:
sock.shutdown(SHUT_RDWR) sock.shutdown(SHUT_RDWR)
@ -905,12 +903,33 @@ class StartupMixin(metaclass=SanicMeta):
loop.close() loop.close()
cls._cleanup_env_vars() cls._cleanup_env_vars()
cls._cleanup_apps() cls._cleanup_apps()
from time import sleep
limit = 100
while cls._get_process_states(worker_state):
sleep(0.1)
limit -= 1
if limit <= 0:
error_logger.warning(
"Worker shutdown timed out. "
"Some processes may still be running."
)
break
sync_manager.shutdown()
unix = kwargs.get("unix") unix = kwargs.get("unix")
if unix: if unix:
remove_unix_socket(unix) remove_unix_socket(unix)
logger.info("Goodbye.")
if exit_code: if exit_code:
os._exit(exit_code) os._exit(exit_code)
@staticmethod
def _get_process_states(worker_state) -> List[str]:
return [
state for s in worker_state.values() if (state := s.get("state"))
]
@classmethod @classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None: def serve_single(cls, primary: Optional[Sanic] = None) -> None:
os.environ["SANIC_MOTD_OUTPUT"] = "true" os.environ["SANIC_MOTD_OUTPUT"] = "true"

View File

@ -122,17 +122,15 @@ def _setup_system_signals(
register_sys_signals: bool, register_sys_signals: bool,
loop: asyncio.AbstractEventLoop, loop: asyncio.AbstractEventLoop,
) -> None: # no cov ) -> None: # no cov
# Ignore SIGINT when run_multiple signal_func(SIGINT, SIG_IGN)
if run_multiple: signal_func(SIGTERM, SIG_IGN)
signal_func(SIGINT, SIG_IGN) os.environ["SANIC_WORKER_PROCESS"] = "true"
os.environ["SANIC_WORKER_PROCESS"] = "true"
# Register signals for graceful termination # Register signals for graceful termination
if register_sys_signals: if register_sys_signals:
if OS_IS_WINDOWS: if OS_IS_WINDOWS:
ctrlc_workaround_for_windows(app) ctrlc_workaround_for_windows(app)
else: else:
for _signal in [SIGTERM] if run_multiple else [SIGINT, SIGTERM]: for _signal in [SIGINT, SIGTERM]:
loop.add_signal_handler( loop.add_signal_handler(
_signal, partial(app.stop, terminate=False) _signal, partial(app.stop, terminate=False)
) )
@ -143,8 +141,6 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
try: try:
server_logger.info("Starting worker [%s]", pid) server_logger.info("Starting worker [%s]", pid)
loop.run_forever() loop.run_forever()
except KeyboardInterrupt:
pass
finally: finally:
server_logger.info("Stopping worker [%s]", pid) server_logger.info("Stopping worker [%s]", pid)
@ -156,6 +152,7 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
loop.run_until_complete(after_stop()) loop.run_until_complete(after_stop())
remove_unix_socket(unix) remove_unix_socket(unix)
loop.close() loop.close()
server_logger.info("Worker complete [%s]", pid)
def _serve_http_1( def _serve_http_1(
@ -259,8 +256,11 @@ def _serve_http_1(
else: else:
conn.abort() conn.abort()
app.set_serving(False)
_setup_system_signals(app, run_multiple, register_sys_signals, loop) _setup_system_signals(app, run_multiple, register_sys_signals, loop)
loop.run_until_complete(app._server_event("init", "after")) loop.run_until_complete(app._server_event("init", "after"))
app.set_serving(True)
_run_server_forever( _run_server_forever(
loop, loop,
partial(app._server_event, "shutdown", "before"), partial(app._server_event, "shutdown", "before"),

View File

@ -122,6 +122,7 @@ class WorkerManager:
self.monitor() self.monitor()
self.join() self.join()
self.terminate() self.terminate()
self.cleanup()
def start(self): def start(self):
for process in self.processes: for process in self.processes:
@ -147,6 +148,11 @@ class WorkerManager:
for process in self.processes: for process in self.processes:
process.terminate() process.terminate()
def cleanup(self):
"""Cleanup the worker processes."""
for process in self.processes:
process.exit()
def restart( def restart(
self, self,
process_names: Optional[List[str]] = None, process_names: Optional[List[str]] = None,

View File

@ -28,6 +28,24 @@ class WorkerMultiplexer:
"state": ProcessState.ACKED.name, "state": ProcessState.ACKED.name,
} }
def set_serving(self, serving: bool) -> None:
"""Set the worker to serving.
Args:
serving (bool): Whether the worker is serving.
"""
self._state._state[self.name] = {
**self._state._state[self.name],
"serving": serving,
}
def exit(self):
"""Run cleanup at worker exit."""
try:
del self._state._state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")
def restart( def restart(
self, self,
name: str = "", name: str = "",

View File

@ -65,6 +65,20 @@ class WorkerProcess:
self.set_state(ProcessState.JOINED) self.set_state(ProcessState.JOINED)
self._current_process.join() self._current_process.join()
def exit(self):
limit = 100
while self.is_alive() and limit > 0:
sleep(0.1)
limit -= 1
if not self.is_alive():
try:
del self.worker_state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")
except KeyError:
logger.debug("Could not find worker state to delete.")
def terminate(self): def terminate(self):
if self.state is not ProcessState.TERMINATED: if self.state is not ProcessState.TERMINATED:
logger.debug( logger.debug(
@ -77,7 +91,6 @@ class WorkerProcess:
self.set_state(ProcessState.TERMINATED, force=True) self.set_state(ProcessState.TERMINATED, force=True)
try: try:
os.kill(self.pid, SIGINT) os.kill(self.pid, SIGINT)
del self.worker_state[self.name]
except (KeyError, AttributeError, ProcessLookupError): except (KeyError, AttributeError, ProcessLookupError):
... ...
@ -118,6 +131,16 @@ class WorkerProcess:
except AssertionError: except AssertionError:
return False return False
# def _run(self, **kwargs):
# atexit.register(self._exit)
# self.target(**kwargs)
# def _exit(self):
# try:
# del self.worker_state[self.name]
# except ConnectionRefusedError:
# logger.debug("Monitor process has already exited.")
def spawn(self): def spawn(self):
if self.state not in (ProcessState.IDLE, ProcessState.RESTARTING): if self.state not in (ProcessState.IDLE, ProcessState.RESTARTING):
raise Exception("Cannot spawn a worker process until it is idle.") raise Exception("Cannot spawn a worker process until it is idle.")