Compare commits

...

1 Commits

Author SHA1 Message Date
Adam Hopkins
e0839fe130
Cleaner process management 2023-08-29 19:17:50 +03:00
6 changed files with 92 additions and 14 deletions

View File

@ -5,7 +5,6 @@ import logging
import logging.config
import re
import sys
from asyncio import (
AbstractEventLoop,
CancelledError,
@ -94,7 +93,6 @@ from sanic.worker.inspector import Inspector
from sanic.worker.loader import CertLoader
from sanic.worker.manager import WorkerManager
if TYPE_CHECKING:
try:
from sanic_ext import Extend # type: ignore
@ -1743,6 +1741,20 @@ class Sanic(
if hasattr(self, "multiplexer"):
self.multiplexer.ack()
def set_serving(self, serving: bool) -> None:
"""Set the serving state of the application.
This method is used to set the serving state of the application.
It is used internally by Sanic and should not typically be called
manually.
Args:
serving (bool): Whether the application is serving.
"""
self.state.is_running = serving
if hasattr(self, "multiplexer"):
self.multiplexer.set_serving(serving)
async def _server_event(
self,
concern: str,

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import os
import platform
import sys
from asyncio import (
AbstractEventLoop,
CancelledError,
@ -65,13 +64,13 @@ from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.protocols.websocket_protocol import WebSocketProtocol
from sanic.server.runners import serve
from sanic.server.socket import configure_socket, remove_unix_socket
from sanic.worker.constants import ProcessState
from sanic.worker.loader import AppLoader
from sanic.worker.manager import WorkerManager
from sanic.worker.multiplexer import WorkerMultiplexer
from sanic.worker.reloader import Reloader
from sanic.worker.serve import worker_serve
if TYPE_CHECKING:
from sanic import Sanic
from sanic.application.state import ApplicationState
@ -893,7 +892,6 @@ class StartupMixin(metaclass=SanicMeta):
app.router.reset()
app.signal_router.reset()
sync_manager.shutdown()
for sock in socks:
try:
sock.shutdown(SHUT_RDWR)
@ -905,12 +903,33 @@ class StartupMixin(metaclass=SanicMeta):
loop.close()
cls._cleanup_env_vars()
cls._cleanup_apps()
from time import sleep
limit = 100
while cls._get_process_states(worker_state):
sleep(0.1)
limit -= 1
if limit <= 0:
error_logger.warning(
"Worker shutdown timed out. "
"Some processes may still be running."
)
break
sync_manager.shutdown()
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
logger.info("Goodbye.")
if exit_code:
os._exit(exit_code)
@staticmethod
def _get_process_states(worker_state) -> List[str]:
return [
state for s in worker_state.values() if (state := s.get("state"))
]
@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
os.environ["SANIC_MOTD_OUTPUT"] = "true"

View File

@ -122,17 +122,15 @@ def _setup_system_signals(
register_sys_signals: bool,
loop: asyncio.AbstractEventLoop,
) -> None: # no cov
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"
signal_func(SIGINT, SIG_IGN)
signal_func(SIGTERM, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"
# Register signals for graceful termination
if register_sys_signals:
if OS_IS_WINDOWS:
ctrlc_workaround_for_windows(app)
else:
for _signal in [SIGTERM] if run_multiple else [SIGINT, SIGTERM]:
for _signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(
_signal, partial(app.stop, terminate=False)
)
@ -143,8 +141,6 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
try:
server_logger.info("Starting worker [%s]", pid)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server_logger.info("Stopping worker [%s]", pid)
@ -156,6 +152,7 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
loop.run_until_complete(after_stop())
remove_unix_socket(unix)
loop.close()
server_logger.info("Worker complete [%s]", pid)
def _serve_http_1(
@ -259,8 +256,11 @@ def _serve_http_1(
else:
conn.abort()
app.set_serving(False)
_setup_system_signals(app, run_multiple, register_sys_signals, loop)
loop.run_until_complete(app._server_event("init", "after"))
app.set_serving(True)
_run_server_forever(
loop,
partial(app._server_event, "shutdown", "before"),

View File

@ -122,6 +122,7 @@ class WorkerManager:
self.monitor()
self.join()
self.terminate()
self.cleanup()
def start(self):
for process in self.processes:
@ -147,6 +148,11 @@ class WorkerManager:
for process in self.processes:
process.terminate()
def cleanup(self):
"""Cleanup the worker processes."""
for process in self.processes:
process.exit()
def restart(
self,
process_names: Optional[List[str]] = None,

View File

@ -28,6 +28,24 @@ class WorkerMultiplexer:
"state": ProcessState.ACKED.name,
}
def set_serving(self, serving: bool) -> None:
"""Set the worker to serving.
Args:
serving (bool): Whether the worker is serving.
"""
self._state._state[self.name] = {
**self._state._state[self.name],
"serving": serving,
}
def exit(self):
"""Run cleanup at worker exit."""
try:
del self._state._state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")
def restart(
self,
name: str = "",

View File

@ -65,6 +65,20 @@ class WorkerProcess:
self.set_state(ProcessState.JOINED)
self._current_process.join()
def exit(self):
limit = 100
while self.is_alive() and limit > 0:
sleep(0.1)
limit -= 1
if not self.is_alive():
try:
del self.worker_state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")
except KeyError:
logger.debug("Could not find worker state to delete.")
def terminate(self):
if self.state is not ProcessState.TERMINATED:
logger.debug(
@ -77,7 +91,6 @@ class WorkerProcess:
self.set_state(ProcessState.TERMINATED, force=True)
try:
os.kill(self.pid, SIGINT)
del self.worker_state[self.name]
except (KeyError, AttributeError, ProcessLookupError):
...
@ -118,6 +131,16 @@ class WorkerProcess:
except AssertionError:
return False
# def _run(self, **kwargs):
# atexit.register(self._exit)
# self.target(**kwargs)
# def _exit(self):
# try:
# del self.worker_state[self.name]
# except ConnectionRefusedError:
# logger.debug("Monitor process has already exited.")
def spawn(self):
if self.state not in (ProcessState.IDLE, ProcessState.RESTARTING):
raise Exception("Cannot spawn a worker process until it is idle.")