Wait for new process to ACK before termination of old on restart

This commit is contained in:
Adam Hopkins 2022-12-08 14:46:45 +02:00
parent d4041161c7
commit 3c4c136090
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
2 changed files with 43 additions and 12 deletions

View File

@ -18,6 +18,7 @@ else:
class WorkerManager: class WorkerManager:
THRESHOLD = 300 # == 30 seconds THRESHOLD = 300 # == 30 seconds
MAIN_IDENT = "Sanic-Main"
def __init__( def __init__(
self, self,
@ -34,7 +35,7 @@ class WorkerManager:
self.durable: List[Worker] = [] self.durable: List[Worker] = []
self.monitor_publisher, self.monitor_subscriber = monitor_pubsub self.monitor_publisher, self.monitor_subscriber = monitor_pubsub
self.worker_state = worker_state self.worker_state = worker_state
self.worker_state["Sanic-Main"] = {"pid": self.pid} self.worker_state[self.MAIN_IDENT] = {"pid": self.pid}
self.terminated = False self.terminated = False
if number == 0: if number == 0:
@ -122,11 +123,18 @@ class WorkerManager:
process_names=process_names, process_names=process_names,
reloaded_files=reloaded_files, reloaded_files=reloaded_files,
) )
self._sync_states()
except InterruptedError: except InterruptedError:
if not OS_IS_WINDOWS: if not OS_IS_WINDOWS:
raise raise
break break
def _sync_states(self):
for process in self.processes:
state = self.worker_state[process.name]["state"]
if process.state.name != state:
process.set_state(ProcessState[state], True)
def wait_for_ack(self): # no cov def wait_for_ack(self): # no cov
misses = 0 misses = 0
message = ( message = (

View File

@ -4,6 +4,7 @@ from datetime import datetime, timezone
from enum import IntEnum, auto from enum import IntEnum, auto
from multiprocessing.context import BaseContext from multiprocessing.context import BaseContext
from signal import SIGINT from signal import SIGINT
from threading import Thread
from typing import Any, Dict, Set from typing import Any, Dict, Set
from sanic.log import Colors, logger from sanic.log import Colors, logger
@ -16,6 +17,8 @@ def get_now():
class ProcessState(IntEnum): class ProcessState(IntEnum):
IDLE = auto() IDLE = auto()
RESTARTING = auto()
STARTING = auto()
STARTED = auto() STARTED = auto()
ACKED = auto() ACKED = auto()
JOINED = auto() JOINED = auto()
@ -54,8 +57,9 @@ class WorkerProcess:
f"{Colors.SANIC}%s{Colors.END}", f"{Colors.SANIC}%s{Colors.END}",
self.name, self.name,
) )
self.set_state(ProcessState.STARTING)
self._current_process.start()
self.set_state(ProcessState.STARTED) self.set_state(ProcessState.STARTED)
self._process.start()
if not self.worker_state[self.name].get("starts"): if not self.worker_state[self.name].get("starts"):
self.worker_state[self.name] = { self.worker_state[self.name] = {
**self.worker_state[self.name], **self.worker_state[self.name],
@ -67,7 +71,7 @@ class WorkerProcess:
def join(self): def join(self):
self.set_state(ProcessState.JOINED) self.set_state(ProcessState.JOINED)
self._process.join() self._current_process.join()
def terminate(self): def terminate(self):
if self.state is not ProcessState.TERMINATED: if self.state is not ProcessState.TERMINATED:
@ -80,7 +84,6 @@ class WorkerProcess:
) )
self.set_state(ProcessState.TERMINATED, force=True) self.set_state(ProcessState.TERMINATED, force=True)
try: try:
# self._process.terminate()
os.kill(self.pid, SIGINT) os.kill(self.pid, SIGINT)
del self.worker_state[self.name] del self.worker_state[self.name]
except (KeyError, AttributeError, ProcessLookupError): except (KeyError, AttributeError, ProcessLookupError):
@ -93,8 +96,8 @@ class WorkerProcess:
self.name, self.name,
self.pid, self.pid,
) )
self._process.terminate() self._old_process = self._current_process
self.set_state(ProcessState.IDLE, force=True) self.set_state(ProcessState.RESTARTING, force=True)
self.kwargs.update( self.kwargs.update(
{"config": {k.upper(): v for k, v in kwargs.items()}} {"config": {k.upper(): v for k, v in kwargs.items()}}
) )
@ -104,6 +107,9 @@ class WorkerProcess:
except AttributeError: except AttributeError:
raise RuntimeError("Restart failed") raise RuntimeError("Restart failed")
termination_thread = Thread(target=self.wait_to_terminate)
termination_thread.start()
self.worker_state[self.name] = { self.worker_state[self.name] = {
**self.worker_state[self.name], **self.worker_state[self.name],
"pid": self.pid, "pid": self.pid,
@ -111,16 +117,31 @@ class WorkerProcess:
"restart_at": get_now(), "restart_at": get_now(),
} }
def wait_to_terminate(self):
# TODO: Add a timeout
while self.state is not ProcessState.ACKED:
...
else:
logger.debug(
f"{Colors.BLUE}Process acked. Terminating: "
f"{Colors.BOLD}{Colors.SANIC}"
f"%s {Colors.BLUE}[%s]{Colors.END}",
self.name,
self._old_process.pid,
)
self._old_process.terminate()
delattr(self, "_old_process")
def is_alive(self): def is_alive(self):
try: try:
return self._process.is_alive() return self._current_process.is_alive()
except AssertionError: except AssertionError:
return False return False
def spawn(self): def spawn(self):
if self.state is not ProcessState.IDLE: if self.state not in (ProcessState.IDLE, ProcessState.RESTARTING):
raise Exception("Cannot spawn a worker process until it is idle.") raise Exception("Cannot spawn a worker process until it is idle.")
self._process = self.factory( self._current_process = self.factory(
name=self.name, name=self.name,
target=self.target, target=self.target,
kwargs=self.kwargs, kwargs=self.kwargs,
@ -129,10 +150,12 @@ class WorkerProcess:
@property @property
def pid(self): def pid(self):
return self._process.pid return self._current_process.pid
class Worker: class Worker:
WORKER_PREFIX = "SANIC-"
def __init__( def __init__(
self, self,
ident: str, ident: str,
@ -141,7 +164,7 @@ class Worker:
context: BaseContext, context: BaseContext,
worker_state: Dict[str, Any], worker_state: Dict[str, Any],
): ):
self.ident = ident self.ident = f"{self.WORKER_PREFIX}{ident}"
self.context = context self.context = context
self.serve = serve self.serve = serve
self.server_settings = server_settings self.server_settings = server_settings
@ -152,7 +175,7 @@ class Worker:
def create_process(self) -> WorkerProcess: def create_process(self) -> WorkerProcess:
process = WorkerProcess( process = WorkerProcess(
factory=self.context.Process, factory=self.context.Process,
name=f"Sanic-{self.ident}-{len(self.processes)}", name=f"{self.ident}-{len(self.processes)}",
target=self.serve, target=self.serve,
kwargs={**self.server_settings}, kwargs={**self.server_settings},
worker_state=self.worker_state, worker_state=self.worker_state,