From 3c4c136090ca9a5323cf7d3904ad7744098c10ed Mon Sep 17 00:00:00 2001 From: Adam Hopkins Date: Thu, 8 Dec 2022 14:46:45 +0200 Subject: [PATCH] Wait for new process to ACK before termination of old on restart --- sanic/worker/manager.py | 10 ++++++++- sanic/worker/process.py | 45 +++++++++++++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/sanic/worker/manager.py b/sanic/worker/manager.py index ad77a138..bba4ecaf 100644 --- a/sanic/worker/manager.py +++ b/sanic/worker/manager.py @@ -18,6 +18,7 @@ else: class WorkerManager: THRESHOLD = 300 # == 30 seconds + MAIN_IDENT = "Sanic-Main" def __init__( self, @@ -34,7 +35,7 @@ class WorkerManager: self.durable: List[Worker] = [] self.monitor_publisher, self.monitor_subscriber = monitor_pubsub self.worker_state = worker_state - self.worker_state["Sanic-Main"] = {"pid": self.pid} + self.worker_state[self.MAIN_IDENT] = {"pid": self.pid} self.terminated = False if number == 0: @@ -122,11 +123,18 @@ class WorkerManager: process_names=process_names, reloaded_files=reloaded_files, ) + self._sync_states() except InterruptedError: if not OS_IS_WINDOWS: raise break + def _sync_states(self): + for process in self.processes: + state = self.worker_state[process.name]["state"] + if process.state.name != state: + process.set_state(ProcessState[state], True) + def wait_for_ack(self): # no cov misses = 0 message = ( diff --git a/sanic/worker/process.py b/sanic/worker/process.py index e6a4d484..d19e3cd4 100644 --- a/sanic/worker/process.py +++ b/sanic/worker/process.py @@ -4,6 +4,7 @@ from datetime import datetime, timezone from enum import IntEnum, auto from multiprocessing.context import BaseContext from signal import SIGINT +from threading import Thread from typing import Any, Dict, Set from sanic.log import Colors, logger @@ -16,6 +17,8 @@ def get_now(): class ProcessState(IntEnum): IDLE = auto() + RESTARTING = auto() + STARTING = auto() STARTED = auto() ACKED = auto() JOINED = auto() @@ -54,8 +57,9 @@ class WorkerProcess: f"{Colors.SANIC}%s{Colors.END}", self.name, ) + self.set_state(ProcessState.STARTING) + self._current_process.start() self.set_state(ProcessState.STARTED) - self._process.start() if not self.worker_state[self.name].get("starts"): self.worker_state[self.name] = { **self.worker_state[self.name], @@ -67,7 +71,7 @@ class WorkerProcess: def join(self): self.set_state(ProcessState.JOINED) - self._process.join() + self._current_process.join() def terminate(self): if self.state is not ProcessState.TERMINATED: @@ -80,7 +84,6 @@ class WorkerProcess: ) self.set_state(ProcessState.TERMINATED, force=True) try: - # self._process.terminate() os.kill(self.pid, SIGINT) del self.worker_state[self.name] except (KeyError, AttributeError, ProcessLookupError): @@ -93,8 +96,8 @@ class WorkerProcess: self.name, self.pid, ) - self._process.terminate() - self.set_state(ProcessState.IDLE, force=True) + self._old_process = self._current_process + self.set_state(ProcessState.RESTARTING, force=True) self.kwargs.update( {"config": {k.upper(): v for k, v in kwargs.items()}} ) @@ -104,6 +107,9 @@ class WorkerProcess: except AttributeError: raise RuntimeError("Restart failed") + termination_thread = Thread(target=self.wait_to_terminate) + termination_thread.start() + self.worker_state[self.name] = { **self.worker_state[self.name], "pid": self.pid, @@ -111,16 +117,31 @@ class WorkerProcess: "restart_at": get_now(), } + def wait_to_terminate(self): + # TODO: Add a timeout + while self.state is not ProcessState.ACKED: + ... + else: + logger.debug( + f"{Colors.BLUE}Process acked. Terminating: " + f"{Colors.BOLD}{Colors.SANIC}" + f"%s {Colors.BLUE}[%s]{Colors.END}", + self.name, + self._old_process.pid, + ) + self._old_process.terminate() + delattr(self, "_old_process") + def is_alive(self): try: - return self._process.is_alive() + return self._current_process.is_alive() except AssertionError: return False def spawn(self): - if self.state is not ProcessState.IDLE: + if self.state not in (ProcessState.IDLE, ProcessState.RESTARTING): raise Exception("Cannot spawn a worker process until it is idle.") - self._process = self.factory( + self._current_process = self.factory( name=self.name, target=self.target, kwargs=self.kwargs, @@ -129,10 +150,12 @@ class WorkerProcess: @property def pid(self): - return self._process.pid + return self._current_process.pid class Worker: + WORKER_PREFIX = "SANIC-" + def __init__( self, ident: str, @@ -141,7 +164,7 @@ class Worker: context: BaseContext, worker_state: Dict[str, Any], ): - self.ident = ident + self.ident = f"{self.WORKER_PREFIX}{ident}" self.context = context self.serve = serve self.server_settings = server_settings @@ -152,7 +175,7 @@ class Worker: def create_process(self) -> WorkerProcess: process = WorkerProcess( factory=self.context.Process, - name=f"Sanic-{self.ident}-{len(self.processes)}", + name=f"{self.ident}-{len(self.processes)}", target=self.serve, kwargs={**self.server_settings}, worker_state=self.worker_state,