Reorganize tests

This commit is contained in:
Adam Hopkins 2022-12-12 11:44:42 +02:00
parent 3536c0af27
commit f034a31d29
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
2 changed files with 66 additions and 56 deletions

View File

@ -152,6 +152,13 @@ class WorkerProcess:
termination_thread.start()
def _wait_to_terminate(self):
logger.debug(
f"{Colors.BLUE}Waiting for process to be acked: "
f"{Colors.BOLD}{Colors.SANIC}"
f"%s {Colors.BLUE}[%s]{Colors.END}",
self.name,
self._old_process.pid,
)
# TODO: Add a timeout?
while self.state is not ProcessState.ACKED:
...

View File

@ -5,7 +5,7 @@ import threading
from asyncio import Event
from logging import DEBUG
from pathlib import Path
from unittest.mock import Mock, patch
from unittest.mock import Mock
import pytest
@ -72,6 +72,64 @@ def test_iter_files():
assert len_total_files == len_python_files + len_static_files
@pytest.mark.parametrize(
"order,expected",
(
(
"shutdown_first",
[
"Restarting a process",
"Begin restart termination",
"Starting a process",
],
),
(
"startup_first",
[
"Restarting a process",
"Starting a process",
"Begin restart termination",
"Waiting for process to be acked",
"Process acked. Terminating",
],
),
),
)
def test_default_reload_shutdown_order(monkeypatch, caplog, order, expected):
current_process = Mock()
worker_process = WorkerProcess(
lambda **_: current_process,
"Test",
lambda **_: ...,
{},
{},
RestartOrder[order.upper()],
)
def start(self):
worker_process.set_state(ProcessState.ACKED)
self._target()
orig = threading.Thread.start
monkeypatch.setattr(threading.Thread, "start", start)
with caplog.at_level(DEBUG):
worker_process.restart()
ansi = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def clean(msg: str):
msg, _ = ansi.sub("", msg).split(":", 1)
return msg
debug = [clean(record[2]) for record in caplog.record_tuples]
assert debug == expected
current_process.start.assert_called_once()
current_process.terminate.assert_called_once()
monkeypatch.setattr(threading.Thread, "start", orig)
def test_reloader_triggers_start_stop_listeners(
app: Sanic, app_loader: AppLoader
):
@ -159,58 +217,3 @@ def test_check_file(tmp_path):
assert Reloader.check_file(current, mtimes) is False
mtimes[current] = mtimes[current] - 1
assert Reloader.check_file(current, mtimes) is True
@pytest.mark.parametrize(
"order,expected",
(
(
"shutdown_first",
[
"Restarting a process",
"Begin restart termination",
"Starting a process",
],
),
(
"startup_first",
[
"Restarting a process",
"Starting a process",
"Begin restart termination",
"Process acked. Terminating",
],
),
),
)
def test_default_reload_shutdown_order(monkeypatch, caplog, order, expected):
current_process = Mock()
worker_process = WorkerProcess(
lambda **_: current_process,
"Test",
lambda **_: ...,
{},
{},
RestartOrder[order.upper()],
)
def start(self):
worker_process.set_state(ProcessState.ACKED)
self._target()
monkeypatch.setattr(threading.Thread, "start", start)
with caplog.at_level(DEBUG):
worker_process.restart()
ansi = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def clean(msg: str):
msg, _ = ansi.sub("", msg).split(":", 1)
return msg
debug = [clean(record[2]) for record in caplog.record_tuples]
assert debug == expected
current_process.start.assert_called_once()
current_process.terminate.assert_called_once()