Sanic Server WorkerManager refactor (#2499)

Co-authored-by: Néstor Pérez <25409753+prryplatypus@users.noreply.github.com>
This commit is contained in:
Adam Hopkins
2022-09-18 17:17:23 +03:00
committed by GitHub
parent d352a4155e
commit 4726cf1910
73 changed files with 3929 additions and 940 deletions

View File

@@ -126,7 +126,7 @@ def sanic_router(app):
except RouteExists:
pass
router.finalize()
return router, added_router
return router, tuple(added_router)
return _setup

View File

@@ -1,6 +1,8 @@
import json
from sanic import Sanic, text
from sanic.application.constants import Mode
from sanic.config import Config
from sanic.log import LOGGING_CONFIG_DEFAULTS, logger
@@ -16,7 +18,7 @@ async def handler(request):
return text(request.ip)
@app.before_server_start
@app.main_process_start
async def app_info_dump(app: Sanic, _):
app_data = {
"access_log": app.config.ACCESS_LOG,
@@ -27,6 +29,13 @@ async def app_info_dump(app: Sanic, _):
logger.info(json.dumps(app_data))
@app.main_process_stop
async def app_cleanup(app: Sanic, _):
app.state.auto_reload = False
app.state.mode = Mode.PRODUCTION
app.config = Config()
@app.after_server_start
async def shutdown(app: Sanic, _):
app.stop()
@@ -38,8 +47,8 @@ def create_app():
def create_app_with_args(args):
try:
print(f"foo={args.foo}")
logger.info(f"foo={args.foo}")
except AttributeError:
print(f"module={args.module}")
logger.info(f"module={args.module}")
return app

View File

@@ -35,6 +35,7 @@ def test_server_starts_http3(app: Sanic, version, caplog):
"cert": localhost_dir / "fullchain.pem",
"key": localhost_dir / "privkey.pem",
},
single_process=True,
)
assert ev.is_set()
@@ -69,7 +70,7 @@ def test_server_starts_http1_and_http3(app: Sanic, caplog):
},
)
with caplog.at_level(logging.INFO):
Sanic.serve()
Sanic.serve_single()
assert (
"sanic.root",

View File

@@ -4,6 +4,7 @@ import re
from collections import Counter
from inspect import isawaitable
from os import environ
from unittest.mock import Mock, patch
import pytest
@@ -15,6 +16,7 @@ from sanic.compat import OS_IS_WINDOWS
from sanic.config import Config
from sanic.exceptions import SanicException
from sanic.helpers import _default
from sanic.log import LOGGING_CONFIG_DEFAULTS
from sanic.response import text
@@ -23,7 +25,7 @@ def clear_app_registry():
Sanic._app_registry = {}
def test_app_loop_running(app):
def test_app_loop_running(app: Sanic):
@app.get("/test")
async def handler(request):
assert isinstance(app.loop, asyncio.AbstractEventLoop)
@@ -33,7 +35,7 @@ def test_app_loop_running(app):
assert response.text == "pass"
def test_create_asyncio_server(app):
def test_create_asyncio_server(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
assert isawaitable(asyncio_srv_coro)
@@ -41,7 +43,7 @@ def test_create_asyncio_server(app):
assert srv.is_serving() is True
def test_asyncio_server_no_start_serving(app):
def test_asyncio_server_no_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
port=43123,
@@ -52,7 +54,7 @@ def test_asyncio_server_no_start_serving(app):
assert srv.is_serving() is False
def test_asyncio_server_start_serving(app):
def test_asyncio_server_start_serving(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
port=43124,
@@ -69,7 +71,7 @@ def test_asyncio_server_start_serving(app):
# Looks like we can't easily test `serve_forever()`
def test_create_server_main(app, caplog):
def test_create_server_main(app: Sanic, caplog):
app.listener("main_process_start")(lambda *_: ...)
loop = asyncio.get_event_loop()
with caplog.at_level(logging.INFO):
@@ -83,7 +85,7 @@ def test_create_server_main(app, caplog):
) in caplog.record_tuples
def test_create_server_no_startup(app):
def test_create_server_no_startup(app: Sanic):
loop = asyncio.get_event_loop()
asyncio_srv_coro = app.create_server(
port=43124,
@@ -98,7 +100,7 @@ def test_create_server_no_startup(app):
loop.run_until_complete(srv.start_serving())
def test_create_server_main_convenience(app, caplog):
def test_create_server_main_convenience(app: Sanic, caplog):
app.main_process_start(lambda *_: ...)
loop = asyncio.get_event_loop()
with caplog.at_level(logging.INFO):
@@ -112,7 +114,7 @@ def test_create_server_main_convenience(app, caplog):
) in caplog.record_tuples
def test_app_loop_not_running(app):
def test_app_loop_not_running(app: Sanic):
with pytest.raises(SanicException) as excinfo:
app.loop
@@ -122,7 +124,7 @@ def test_app_loop_not_running(app):
)
def test_app_run_raise_type_error(app):
def test_app_run_raise_type_error(app: Sanic):
with pytest.raises(TypeError) as excinfo:
app.run(loop="loop")
@@ -135,7 +137,7 @@ def test_app_run_raise_type_error(app):
)
def test_app_route_raise_value_error(app):
def test_app_route_raise_value_error(app: Sanic):
with pytest.raises(ValueError) as excinfo:
@@ -149,11 +151,10 @@ def test_app_route_raise_value_error(app):
)
def test_app_handle_request_handler_is_none(app, monkeypatch):
def test_app_handle_request_handler_is_none(app: Sanic, monkeypatch):
def mockreturn(*args, **kwargs):
return Mock(), None, {}
# Not sure how to make app.router.get() return None, so use mock here.
monkeypatch.setattr(app.router, "get", mockreturn)
@app.get("/test")
@@ -170,7 +171,7 @@ def test_app_handle_request_handler_is_none(app, monkeypatch):
@pytest.mark.parametrize("websocket_enabled", [True, False])
@pytest.mark.parametrize("enable", [True, False])
def test_app_enable_websocket(app, websocket_enabled, enable):
def test_app_enable_websocket(app: Sanic, websocket_enabled, enable):
app.websocket_enabled = websocket_enabled
app.enable_websocket(enable=enable)
@@ -180,11 +181,11 @@ def test_app_enable_websocket(app, websocket_enabled, enable):
async def handler(request, ws):
await ws.send("test")
assert app.websocket_enabled == True
assert app.websocket_enabled is True
@patch("sanic.mixins.runner.WebSocketProtocol")
def test_app_websocket_parameters(websocket_protocol_mock, app):
@patch("sanic.mixins.startup.WebSocketProtocol")
def test_app_websocket_parameters(websocket_protocol_mock, app: Sanic):
app.config.WEBSOCKET_MAX_SIZE = 44
app.config.WEBSOCKET_PING_TIMEOUT = 48
app.config.WEBSOCKET_PING_INTERVAL = 50
@@ -194,9 +195,10 @@ def test_app_websocket_parameters(websocket_protocol_mock, app):
await ws.send("test")
try:
# This will fail because WebSocketProtocol is mocked and only the call kwargs matter
# This will fail because WebSocketProtocol is mocked and only the
# call kwargs matter
app.test_client.get("/ws")
except:
except Exception:
pass
websocket_protocol_call_args = websocket_protocol_mock.call_args
@@ -212,11 +214,10 @@ def test_app_websocket_parameters(websocket_protocol_mock, app):
)
def test_handle_request_with_nested_exception(app, monkeypatch):
def test_handle_request_with_nested_exception(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
# Not sure how to raise an exception in app.error_handler.response(), use mock here
def mock_error_handler_response(*args, **kwargs):
raise Exception(err_msg)
@@ -233,11 +234,10 @@ def test_handle_request_with_nested_exception(app, monkeypatch):
assert response.text == "An error occurred while handling an error"
def test_handle_request_with_nested_exception_debug(app, monkeypatch):
def test_handle_request_with_nested_exception_debug(app: Sanic, monkeypatch):
err_msg = "Mock Exception"
# Not sure how to raise an exception in app.error_handler.response(), use mock here
def mock_error_handler_response(*args, **kwargs):
raise Exception(err_msg)
@@ -252,13 +252,14 @@ def test_handle_request_with_nested_exception_debug(app, monkeypatch):
request, response = app.test_client.get("/", debug=True)
assert response.status == 500
assert response.text.startswith(
f"Error while handling error: {err_msg}\nStack: Traceback (most recent call last):\n"
f"Error while handling error: {err_msg}\n"
"Stack: Traceback (most recent call last):\n"
)
def test_handle_request_with_nested_sanic_exception(app, monkeypatch, caplog):
# Not sure how to raise an exception in app.error_handler.response(), use mock here
def test_handle_request_with_nested_sanic_exception(
app: Sanic, monkeypatch, caplog
):
def mock_error_handler_response(*args, **kwargs):
raise SanicException("Mock SanicException")
@@ -301,8 +302,12 @@ def test_app_has_test_mode_sync():
def test_app_registry():
assert len(Sanic._app_registry) == 0
instance = Sanic("test")
assert len(Sanic._app_registry) == 1
assert Sanic._app_registry["test"] is instance
Sanic.unregister_app(instance)
assert len(Sanic._app_registry) == 0
def test_app_registry_wrong_type():
@@ -371,7 +376,7 @@ def test_get_app_default_ambiguous():
Sanic.get_app()
def test_app_set_attribute_warning(app):
def test_app_set_attribute_warning(app: Sanic):
message = (
"Setting variables on Sanic instances is not allowed. You should "
"change your Sanic instance to use instance.ctx.foo instead."
@@ -380,7 +385,7 @@ def test_app_set_attribute_warning(app):
app.foo = 1
def test_app_set_context(app):
def test_app_set_context(app: Sanic):
app.ctx.foo = 1
retrieved = Sanic.get_app(app.name)
@@ -426,13 +431,13 @@ def test_custom_context():
@pytest.mark.parametrize("use", (False, True))
def test_uvloop_config(app, monkeypatch, use):
def test_uvloop_config(app: Sanic, monkeypatch, use):
@app.get("/test")
def handler(request):
return text("ok")
try_use_uvloop = Mock()
monkeypatch.setattr(sanic.mixins.runner, "try_use_uvloop", try_use_uvloop)
monkeypatch.setattr(sanic.mixins.startup, "try_use_uvloop", try_use_uvloop)
# Default config
app.test_client.get("/test")
@@ -458,7 +463,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
apps[2].config.USE_UVLOOP = True
try_use_uvloop = Mock()
monkeypatch.setattr(sanic.mixins.runner, "try_use_uvloop", try_use_uvloop)
monkeypatch.setattr(sanic.mixins.startup, "try_use_uvloop", try_use_uvloop)
loop = asyncio.get_event_loop()
@@ -517,12 +522,133 @@ def test_multiple_uvloop_configs_display_warning(caplog):
assert counter[(logging.WARNING, message)] == 2
def test_cannot_run_fast_and_workers(app):
def test_cannot_run_fast_and_workers(app: Sanic):
message = "You cannot use both fast=True and workers=X"
with pytest.raises(RuntimeError, match=message):
app.run(fast=True, workers=4)
def test_no_workers(app):
def test_no_workers(app: Sanic):
with pytest.raises(RuntimeError, match="Cannot serve with no workers"):
app.run(workers=0)
@pytest.mark.parametrize(
"extra",
(
{"fast": True},
{"workers": 2},
{"auto_reload": True},
),
)
def test_cannot_run_single_process_and_workers_or_auto_reload(
app: Sanic, extra
):
message = (
"Single process cannot be run with multiple workers or auto-reload"
)
with pytest.raises(RuntimeError, match=message):
app.run(single_process=True, **extra)
def test_cannot_run_single_process_and_legacy(app: Sanic):
message = "Cannot run single process and legacy mode"
with pytest.raises(RuntimeError, match=message):
app.run(single_process=True, legacy=True)
def test_cannot_run_without_sys_signals_with_workers(app: Sanic):
message = (
"Cannot run Sanic.serve with register_sys_signals=False. "
"Use either Sanic.serve_single or Sanic.serve_legacy."
)
with pytest.raises(RuntimeError, match=message):
app.run(register_sys_signals=False, single_process=False, legacy=False)
def test_default_configure_logging():
with patch("sanic.app.logging") as mock:
Sanic("Test")
mock.config.dictConfig.assert_called_with(LOGGING_CONFIG_DEFAULTS)
def test_custom_configure_logging():
with patch("sanic.app.logging") as mock:
Sanic("Test", log_config={"foo": "bar"})
mock.config.dictConfig.assert_called_with({"foo": "bar"})
def test_disable_configure_logging():
with patch("sanic.app.logging") as mock:
Sanic("Test", configure_logging=False)
mock.config.dictConfig.assert_not_called()
@pytest.mark.parametrize("inspector", (True, False))
def test_inspector(inspector):
app = Sanic("Test", inspector=inspector)
assert app.config.INSPECTOR is inspector
def test_build_endpoint_name():
app = Sanic("Test")
name = app._build_endpoint_name("foo", "bar")
assert name == "Test.foo.bar"
def test_manager_in_main_process_only(app: Sanic):
message = "Can only access the manager from the main process"
with pytest.raises(SanicException, match=message):
app.manager
app._manager = 1
environ["SANIC_WORKER_PROCESS"] = "ok"
with pytest.raises(SanicException, match=message):
app.manager
del environ["SANIC_WORKER_PROCESS"]
assert app.manager == 1
def test_inspector_in_main_process_only(app: Sanic):
message = "Can only access the inspector from the main process"
with pytest.raises(SanicException, match=message):
app.inspector
app._inspector = 1
environ["SANIC_WORKER_PROCESS"] = "ok"
with pytest.raises(SanicException, match=message):
app.inspector
del environ["SANIC_WORKER_PROCESS"]
assert app.inspector == 1
def test_stop_trigger_terminate(app: Sanic):
app.multiplexer = Mock()
app.stop()
app.multiplexer.terminate.assert_called_once()
app.multiplexer.reset_mock()
assert len(Sanic._app_registry) == 1
Sanic._app_registry.clear()
app.stop(terminate=True)
app.multiplexer.terminate.assert_called_once()
app.multiplexer.reset_mock()
assert len(Sanic._app_registry) == 0
Sanic._app_registry.clear()
app.stop(unregister=False)
app.multiplexer.terminate.assert_called_once()

View File

@@ -1,13 +1,16 @@
import asyncio
from sanic import Sanic
def test_bad_request_response(app):
def test_bad_request_response(app: Sanic):
lines = []
app.get("/")(lambda x: ...)
@app.listener("after_server_start")
async def _request(sanic, loop):
nonlocal lines
connect = asyncio.open_connection("127.0.0.1", 42101)
reader, writer = await connect
writer.write(b"not http\r\n\r\n")
@@ -18,6 +21,6 @@ def test_bad_request_response(app):
lines.append(line)
app.stop()
app.run(host="127.0.0.1", port=42101, debug=False)
app.run(host="127.0.0.1", port=42101, debug=False, single_process=True)
assert lines[0] == b"HTTP/1.1 400 Bad Request\r\n"
assert b"Bad Request" in lines[-2]

View File

@@ -1,5 +1,6 @@
import json
import subprocess
import os
import sys
from pathlib import Path
from typing import List, Optional, Tuple
@@ -9,33 +10,30 @@ import pytest
from sanic_routing import __version__ as __routing_version__
from sanic import __version__
from sanic.__main__ import main
def capture(command: List[str]):
proc = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=Path(__file__).parent,
)
@pytest.fixture(scope="module", autouse=True)
def tty():
orig = sys.stdout.isatty
sys.stdout.isatty = lambda: False
yield
sys.stdout.isatty = orig
def capture(command: List[str], caplog):
caplog.clear()
os.chdir(Path(__file__).parent)
try:
out, err = proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
out, err = proc.communicate()
return out, err, proc.returncode
def starting_line(lines: List[str]):
for idx, line in enumerate(lines):
if line.strip().startswith(b"Sanic v"):
return idx
return 0
main(command)
except SystemExit:
...
return [record.message for record in caplog.records]
def read_app_info(lines: List[str]):
for line in lines:
if line.startswith(b"{") and line.endswith(b"}"):
if line.startswith("{") and line.endswith("}"): # type: ignore
return json.loads(line)
@@ -47,59 +45,57 @@ def read_app_info(lines: List[str]):
("fake.server.create_app()", None),
),
)
def test_server_run(appname: str, extra: Optional[str]):
command = ["sanic", appname]
def test_server_run(
appname: str,
extra: Optional[str],
caplog: pytest.LogCaptureFixture,
):
command = [appname]
if extra:
command.append(extra)
out, err, exitcode = capture(command)
lines = out.split(b"\n")
firstline = lines[starting_line(lines) + 1]
lines = capture(command, caplog)
assert exitcode != 1
assert firstline == b"Goin' Fast @ http://127.0.0.1:8000"
assert "Goin' Fast @ http://127.0.0.1:8000" in lines
def test_server_run_factory_with_args():
def test_server_run_factory_with_args(caplog):
command = [
"sanic",
"fake.server.create_app_with_args",
"--factory",
]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
lines = capture(command, caplog)
assert exitcode != 1, lines
assert b"module=fake.server.create_app_with_args" in lines
assert "module=fake.server.create_app_with_args" in lines
def test_server_run_factory_with_args_arbitrary():
def test_server_run_factory_with_args_arbitrary(caplog):
command = [
"sanic",
"fake.server.create_app_with_args",
"--factory",
"--foo=bar",
]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
lines = capture(command, caplog)
assert exitcode != 1, lines
assert b"foo=bar" in lines
assert "foo=bar" in lines
def test_error_with_function_as_instance_without_factory_arg():
command = ["sanic", "fake.server.create_app"]
out, err, exitcode = capture(command)
assert b"try: \nsanic fake.server.create_app --factory" in err
assert exitcode != 1
def test_error_with_path_as_instance_without_simple_arg():
command = ["sanic", "./fake/"]
out, err, exitcode = capture(command)
def test_error_with_function_as_instance_without_factory_arg(caplog):
command = ["fake.server.create_app"]
lines = capture(command, caplog)
assert (
b"Please use --simple if you are passing a directory to sanic." in err
)
assert exitcode != 1
"Failed to run app: Module is not a Sanic app, it is a function\n "
"If this callable returns a Sanic instance try: \n"
"sanic fake.server.create_app --factory"
) in lines
def test_error_with_path_as_instance_without_simple_arg(caplog):
command = ["./fake/"]
lines = capture(command, caplog)
assert (
"Failed to run app: App not found.\n Please use --simple if you "
"are passing a directory to sanic.\n eg. sanic ./fake/ --simple"
) in lines
@pytest.mark.parametrize(
@@ -120,13 +116,10 @@ def test_error_with_path_as_instance_without_simple_arg():
),
),
)
def test_tls_options(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode != 1
lines = out.split(b"\n")
firstline = lines[starting_line(lines) + 1]
assert firstline == b"Goin' Fast @ https://127.0.0.1:9999"
def test_tls_options(cmd: Tuple[str, ...], caplog):
command = ["fake.server.app", *cmd, "--port=9999", "--debug"]
lines = capture(command, caplog)
assert "Goin' Fast @ https://127.0.0.1:9999" in lines
@pytest.mark.parametrize(
@@ -141,14 +134,15 @@ def test_tls_options(cmd: Tuple[str, ...]):
("--tls-strict-host",),
),
)
def test_tls_wrong_options(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd, "-p=9999", "--debug"]
out, err, exitcode = capture(command)
assert exitcode == 1
assert not out
lines = err.decode().split("\n")
def test_tls_wrong_options(cmd: Tuple[str, ...], caplog):
command = ["fake.server.app", *cmd, "-p=9999", "--debug"]
lines = capture(command, caplog)
assert "TLS certificates must be specified by either of:" in lines
assert (
"TLS certificates must be specified by either of:\n "
"--cert certdir/fullchain.pem --key certdir/privkey.pem\n "
"--tls certdir (equivalent to the above)"
) in lines
@pytest.mark.parametrize(
@@ -158,65 +152,44 @@ def test_tls_wrong_options(cmd: Tuple[str, ...]):
("-H", "localhost", "-p", "9999"),
),
)
def test_host_port_localhost(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
expected = b"Goin' Fast @ http://localhost:9999"
def test_host_port_localhost(cmd: Tuple[str, ...], caplog):
command = ["fake.server.app", *cmd]
lines = capture(command, caplog)
expected = "Goin' Fast @ http://localhost:9999"
assert exitcode != 1
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
assert expected in lines
@pytest.mark.parametrize(
"cmd",
"cmd,expected",
(
("--host=127.0.0.127", "--port=9999"),
("-H", "127.0.0.127", "-p", "9999"),
(
("--host=localhost", "--port=9999"),
"Goin' Fast @ http://localhost:9999",
),
(
("-H", "localhost", "-p", "9999"),
"Goin' Fast @ http://localhost:9999",
),
(
("--host=127.0.0.127", "--port=9999"),
"Goin' Fast @ http://127.0.0.127:9999",
),
(
("-H", "127.0.0.127", "-p", "9999"),
"Goin' Fast @ http://127.0.0.127:9999",
),
(("--host=::", "--port=9999"), "Goin' Fast @ http://[::]:9999"),
(("-H", "::", "-p", "9999"), "Goin' Fast @ http://[::]:9999"),
(("--host=::1", "--port=9999"), "Goin' Fast @ http://[::1]:9999"),
(("-H", "::1", "-p", "9999"), "Goin' Fast @ http://[::1]:9999"),
),
)
def test_host_port_ipv4(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
expected = b"Goin' Fast @ http://127.0.0.127:9999"
def test_host_port(cmd: Tuple[str, ...], expected: str, caplog):
command = ["fake.server.app", *cmd]
lines = capture(command, caplog)
assert exitcode != 1
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
"cmd",
(
("--host=::", "--port=9999"),
("-H", "::", "-p", "9999"),
),
)
def test_host_port_ipv6_any(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
expected = b"Goin' Fast @ http://[::]:9999"
assert exitcode != 1
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
"cmd",
(
("--host=::1", "--port=9999"),
("-H", "::1", "-p", "9999"),
),
)
def test_host_port_ipv6_loopback(cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
expected = b"Goin' Fast @ http://[::1]:9999"
assert exitcode != 1
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
assert expected in lines
@pytest.mark.parametrize(
@@ -230,82 +203,74 @@ def test_host_port_ipv6_loopback(cmd: Tuple[str, ...]):
(4, ("-w", "4")),
),
)
def test_num_workers(num: int, cmd: Tuple[str, ...]):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
def test_num_workers(num: int, cmd: Tuple[str, ...], caplog):
command = ["fake.server.app", *cmd]
lines = capture(command, caplog)
if num == 1:
expected = b"mode: production, single worker"
expected = "mode: production, single worker"
else:
expected = (f"mode: production, w/ {num} workers").encode()
expected = f"mode: production, w/ {num} workers"
assert exitcode != 1
assert expected in lines, f"Expected {expected}\nLines found: {lines}"
assert expected in lines
@pytest.mark.parametrize("cmd", ("--debug",))
def test_debug(cmd: str):
command = ["sanic", "fake.server.app", cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
def test_debug(cmd: str, caplog):
command = ["fake.server.app", cmd]
lines = capture(command, caplog)
info = read_app_info(lines)
assert info["debug"] is True, f"Lines found: {lines}\nErr output: {err}"
assert (
info["auto_reload"] is False
), f"Lines found: {lines}\nErr output: {err}"
assert "dev" not in info, f"Lines found: {lines}\nErr output: {err}"
assert info["debug"] is True
assert info["auto_reload"] is False
@pytest.mark.parametrize("cmd", ("--dev", "-d"))
def test_dev(cmd: str):
command = ["sanic", "fake.server.app", cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
def test_dev(cmd: str, caplog):
command = ["fake.server.app", cmd]
lines = capture(command, caplog)
info = read_app_info(lines)
assert info["debug"] is True, f"Lines found: {lines}\nErr output: {err}"
assert (
info["auto_reload"] is True
), f"Lines found: {lines}\nErr output: {err}"
assert info["debug"] is True
assert info["auto_reload"] is True
@pytest.mark.parametrize("cmd", ("--auto-reload", "-r"))
def test_auto_reload(cmd: str):
command = ["sanic", "fake.server.app", cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
def test_auto_reload(cmd: str, caplog):
command = ["fake.server.app", cmd]
lines = capture(command, caplog)
info = read_app_info(lines)
assert info["debug"] is False, f"Lines found: {lines}\nErr output: {err}"
assert (
info["auto_reload"] is True
), f"Lines found: {lines}\nErr output: {err}"
assert "dev" not in info, f"Lines found: {lines}\nErr output: {err}"
assert info["debug"] is False
assert info["auto_reload"] is True
@pytest.mark.parametrize(
"cmd,expected", (("--access-log", True), ("--no-access-log", False))
"cmd,expected",
(
("", False),
("--debug", True),
("--access-log", True),
("--no-access-log", False),
),
)
def test_access_logs(cmd: str, expected: bool):
command = ["sanic", "fake.server.app", cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
def test_access_logs(cmd: str, expected: bool, caplog):
command = ["fake.server.app"]
if cmd:
command.append(cmd)
lines = capture(command, caplog)
info = read_app_info(lines)
assert (
info["access_log"] is expected
), f"Lines found: {lines}\nErr output: {err}"
assert info["access_log"] is expected
@pytest.mark.parametrize("cmd", ("--version", "-v"))
def test_version(cmd: str):
command = ["sanic", cmd]
out, err, exitcode = capture(command)
def test_version(cmd: str, caplog, capsys):
command = [cmd]
capture(command, caplog)
version_string = f"Sanic {__version__}; Routing {__routing_version__}\n"
assert out == version_string.encode("utf-8")
out, _ = capsys.readouterr()
assert version_string == out
@pytest.mark.parametrize(
@@ -315,12 +280,9 @@ def test_version(cmd: str):
("--no-noisy-exceptions", False),
),
)
def test_noisy_exceptions(cmd: str, expected: bool):
command = ["sanic", "fake.server.app", cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
def test_noisy_exceptions(cmd: str, expected: bool, caplog):
command = ["fake.server.app", cmd]
lines = capture(command, caplog)
info = read_app_info(lines)
assert (
info["noisy_exceptions"] is expected
), f"Lines found: {lines}\nErr output: {err}"
assert info["noisy_exceptions"] is expected

View File

@@ -293,26 +293,21 @@ def test_config_custom_defaults_with_env():
del environ[key]
def test_config_access_log_passing_in_run(app: Sanic):
assert app.config.ACCESS_LOG is True
@pytest.mark.parametrize("access_log", (True, False))
def test_config_access_log_passing_in_run(app: Sanic, access_log):
assert app.config.ACCESS_LOG is False
@app.listener("after_server_start")
async def _request(sanic, loop):
app.stop()
app.run(port=1340, access_log=False)
assert app.config.ACCESS_LOG is False
app.router.reset()
app.signal_router.reset()
app.run(port=1340, access_log=True)
assert app.config.ACCESS_LOG is True
app.run(port=1340, access_log=access_log, single_process=True)
assert app.config.ACCESS_LOG is access_log
@pytest.mark.asyncio
async def test_config_access_log_passing_in_create_server(app: Sanic):
assert app.config.ACCESS_LOG is True
assert app.config.ACCESS_LOG is False
@app.listener("after_server_start")
async def _request(sanic, loop):

View File

@@ -1,15 +1,24 @@
import pytest
from sanic import Sanic, text
from sanic.application.constants import Mode, Server, ServerStage
from sanic.constants import HTTP_METHODS, HTTPMethod
def test_string_compat():
assert "GET" == HTTPMethod.GET
assert "GET" in HTTP_METHODS
assert "get" == HTTPMethod.GET
assert "get" in HTTP_METHODS
@pytest.mark.parametrize("enum", (HTTPMethod, Server, Mode))
def test_string_compat(enum):
for key in enum.__members__.keys():
assert key.upper() == getattr(enum, key).upper()
assert key.lower() == getattr(enum, key).lower()
assert HTTPMethod.GET.lower() == "get"
assert HTTPMethod.GET.upper() == "GET"
def test_http_methods():
for value in HTTPMethod.__members__.values():
assert value in HTTP_METHODS
def test_server_stage():
assert ServerStage.SERVING > ServerStage.PARTIAL > ServerStage.STOPPED
def test_use_in_routes(app: Sanic):

View File

@@ -1,44 +1,44 @@
# import pytest
import pytest
# from sanic.response import text
# from sanic.router import RouteExists
from sanic_routing.exceptions import RouteExists
from sanic.response import text
# @pytest.mark.parametrize(
# "method,attr, expected",
# [
# ("get", "text", "OK1 test"),
# ("post", "text", "OK2 test"),
# ("put", "text", "OK2 test"),
# ("delete", "status", 405),
# ],
# )
# def test_overload_dynamic_routes(app, method, attr, expected):
# @app.route("/overload/<param>", methods=["GET"])
# async def handler1(request, param):
# return text("OK1 " + param)
@pytest.mark.parametrize(
"method,attr, expected",
[
("get", "text", "OK1 test"),
("post", "text", "OK2 test"),
("put", "text", "OK2 test"),
],
)
def test_overload_dynamic_routes(app, method, attr, expected):
@app.route("/overload/<param>", methods=["GET"])
async def handler1(request, param):
return text("OK1 " + param)
# @app.route("/overload/<param>", methods=["POST", "PUT"])
# async def handler2(request, param):
# return text("OK2 " + param)
@app.route("/overload/<param>", methods=["POST", "PUT"])
async def handler2(request, param):
return text("OK2 " + param)
# request, response = getattr(app.test_client, method)("/overload/test")
# assert getattr(response, attr) == expected
request, response = getattr(app.test_client, method)("/overload/test")
assert getattr(response, attr) == expected
# def test_overload_dynamic_routes_exist(app):
# @app.route("/overload/<param>", methods=["GET"])
# async def handler1(request, param):
# return text("OK1 " + param)
def test_overload_dynamic_routes_exist(app):
@app.route("/overload/<param>", methods=["GET"])
async def handler1(request, param):
return text("OK1 " + param)
# @app.route("/overload/<param>", methods=["POST", "PUT"])
# async def handler2(request, param):
# return text("OK2 " + param)
@app.route("/overload/<param>", methods=["POST", "PUT"])
async def handler2(request, param):
return text("OK2 " + param)
# # if this doesn't raise an error, than at least the below should happen:
# # assert response.text == 'Duplicated'
# with pytest.raises(RouteExists):
# if this doesn't raise an error, than at least the below should happen:
# assert response.text == 'Duplicated'
with pytest.raises(RouteExists):
# @app.route("/overload/<param>", methods=["PUT", "DELETE"])
# async def handler3(request, param):
# return text("Duplicated")
@app.route("/overload/<param>", methods=["PUT", "DELETE"])
async def handler3(request, param):
return text("Duplicated")

View File

@@ -353,7 +353,7 @@ def test_config_fallback_before_and_after_startup(app):
_, response = app.test_client.get("/error")
assert response.status == 500
assert response.content_type == "text/plain; charset=utf-8"
assert response.content_type == "application/json"
def test_config_fallback_using_update_dict(app):

View File

@@ -25,19 +25,19 @@ def stoppable_app(app):
def test_ext_is_loaded(stoppable_app: Sanic, sanic_ext):
stoppable_app.run()
stoppable_app.run(single_process=True)
sanic_ext.Extend.assert_called_once_with(stoppable_app)
def test_ext_is_not_loaded(stoppable_app: Sanic, sanic_ext):
stoppable_app.config.AUTO_EXTEND = False
stoppable_app.run()
stoppable_app.run(single_process=True)
sanic_ext.Extend.assert_not_called()
def test_extend_with_args(stoppable_app: Sanic, sanic_ext):
stoppable_app.extend(built_in_extensions=False)
stoppable_app.run()
stoppable_app.run(single_process=True)
sanic_ext.Extend.assert_called_once_with(
stoppable_app, built_in_extensions=False, config=None, extensions=None
)
@@ -80,5 +80,5 @@ def test_can_access_app_ext_while_running(app: Sanic, sanic_ext, ext_instance):
app.ext.injection(IceCream)
app.stop()
app.run()
app.run(single_process=True)
ext_instance.injection.assert_called_with(IceCream)

View File

@@ -1,48 +1,61 @@
import asyncio
import logging
import time
from multiprocessing import Process
from pytest import LogCaptureFixture
import httpx
from sanic.response import empty
PORT = 42101
def test_no_exceptions_when_cancel_pending_request(app, caplog):
def test_no_exceptions_when_cancel_pending_request(
app, caplog: LogCaptureFixture
):
app.config.GRACEFUL_SHUTDOWN_TIMEOUT = 1
@app.get("/")
async def handler(request):
await asyncio.sleep(5)
@app.after_server_start
def shutdown(app, _):
time.sleep(0.2)
@app.listener("after_server_start")
async def _request(sanic, loop):
connect = asyncio.open_connection("127.0.0.1", 8000)
_, writer = await connect
writer.write(b"GET / HTTP/1.1\r\n\r\n")
app.stop()
def ping():
time.sleep(0.1)
response = httpx.get("http://127.0.0.1:8000")
print(response.status_code)
with caplog.at_level(logging.INFO):
app.run(single_process=True, access_log=True)
p = Process(target=ping)
p.start()
assert "Request: GET http:/// stopped. Transport is closed." in caplog.text
def test_completes_request(app, caplog: LogCaptureFixture):
app.config.GRACEFUL_SHUTDOWN_TIMEOUT = 1
@app.get("/")
async def handler(request):
await asyncio.sleep(0.5)
return empty()
@app.listener("after_server_start")
async def _request(sanic, loop):
connect = asyncio.open_connection("127.0.0.1", 8000)
_, writer = await connect
writer.write(b"GET / HTTP/1.1\r\n\r\n")
app.stop()
with caplog.at_level(logging.INFO):
app.run()
app.run(single_process=True, access_log=True)
p.kill()
assert ("sanic.access", 20, "") in caplog.record_tuples
info = 0
for record in caplog.record_tuples:
assert record[1] != logging.ERROR
if record[1] == logging.INFO:
info += 1
if record[2].startswith("Request:"):
assert record[2] == (
"Request: GET http://127.0.0.1:8000/ stopped. "
"Transport is closed."
)
assert info == 11
# Make sure that the server starts shutdown process before access log
index_stopping = 0
for idx, record in enumerate(caplog.records):
if record.message.startswith("Stopping worker"):
index_stopping = idx
break
index_request = caplog.record_tuples.index(("sanic.access", 20, ""))
assert index_request > index_stopping > 0

View File

@@ -61,6 +61,6 @@ def test_http1_response_has_alt_svc():
version=1,
port=PORT,
)
Sanic.serve()
Sanic.serve_single(app)
assert f'alt-svc: h3=":{PORT}"\r\n'.encode() in response

25
tests/test_init.py Normal file
View File

@@ -0,0 +1,25 @@
from importlib import import_module
import pytest
@pytest.mark.parametrize(
"item",
(
"__version__",
"Sanic",
"Blueprint",
"HTTPMethod",
"HTTPResponse",
"Request",
"Websocket",
"empty",
"file",
"html",
"json",
"redirect",
"text",
),
)
def test_imports(item):
import_module("sanic", item)

View File

@@ -166,6 +166,7 @@ def test_access_log_client_ip_remote_addr(monkeypatch):
monkeypatch.setattr(sanic.http.http1, "access_logger", access)
app = Sanic("test_logging")
app.config.ACCESS_LOG = True
app.config.PROXIES_COUNT = 2
@app.route("/")
@@ -193,6 +194,7 @@ def test_access_log_client_ip_reqip(monkeypatch):
monkeypatch.setattr(sanic.http.http1, "access_logger", access)
app = Sanic("test_logging")
app.config.ACCESS_LOG = True
@app.route("/")
async def handler(request):

View File

@@ -30,9 +30,12 @@ def test_get_logo_returns_expected_logo(tty, full, expected):
def test_get_logo_returns_no_colors_on_apple_terminal():
platform = sys.platform
sys.platform = "darwin"
os.environ["TERM_PROGRAM"] = "Apple_Terminal"
with patch("sys.stdout.isatty") as isatty:
isatty.return_value = False
sys.platform = "darwin"
os.environ["TERM_PROGRAM"] = "Apple_Terminal"
logo = get_logo()
assert "\033" not in logo
sys.platform = platform
del os.environ["TERM_PROGRAM"]

View File

@@ -3,15 +3,23 @@ import os
import platform
import sys
from unittest.mock import Mock
from unittest.mock import Mock, patch
import pytest
from sanic import Sanic, __version__
from sanic import __version__
from sanic.application.logo import BASE_LOGO
from sanic.application.motd import MOTD, MOTDTTY
@pytest.fixture(autouse=True)
def reset():
try:
del os.environ["SANIC_MOTD_OUTPUT"]
except KeyError:
...
def test_logo_base(app, run_startup):
logs = run_startup(app)
@@ -63,20 +71,13 @@ def test_motd_display(caplog):
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not on 3.7")
def test_reload_dirs(app):
app.config.LOGO = None
app.config.MOTD = True
app.config.AUTO_RELOAD = True
app.prepare(reload_dir="./", auto_reload=True, motd_display={"foo": "bar"})
existing = MOTD.output
MOTD.output = Mock()
app.motd("foo")
MOTD.output.assert_called_once()
assert (
MOTD.output.call_args.args[2]["auto-reload"]
== f"enabled, {os.getcwd()}"
)
assert MOTD.output.call_args.args[3] == {"foo": "bar"}
MOTD.output = existing
Sanic._app_registry = {}
with patch.object(MOTD, "output") as mock:
app.prepare(
reload_dir="./", auto_reload=True, motd_display={"foo": "bar"}
)
mock.assert_called()
assert mock.call_args.args[2]["auto-reload"] == f"enabled, {os.getcwd()}"
assert mock.call_args.args[3] == {"foo": "bar"}

View File

@@ -1,207 +1,207 @@
import logging
# import logging
from unittest.mock import Mock
# from unittest.mock import Mock
import pytest
# import pytest
from sanic import Sanic
from sanic.response import text
from sanic.server.async_server import AsyncioServer
from sanic.signals import Event
from sanic.touchup.schemes.ode import OptionalDispatchEvent
# from sanic import Sanic
# from sanic.response import text
# from sanic.server.async_server import AsyncioServer
# from sanic.signals import Event
# from sanic.touchup.schemes.ode import OptionalDispatchEvent
try:
from unittest.mock import AsyncMock
except ImportError:
from tests.asyncmock import AsyncMock # type: ignore
# try:
# from unittest.mock import AsyncMock
# except ImportError:
# from tests.asyncmock import AsyncMock # type: ignore
@pytest.fixture
def app_one():
app = Sanic("One")
# @pytest.fixture
# def app_one():
# app = Sanic("One")
@app.get("/one")
async def one(request):
return text("one")
# @app.get("/one")
# async def one(request):
# return text("one")
return app
# return app
@pytest.fixture
def app_two():
app = Sanic("Two")
# @pytest.fixture
# def app_two():
# app = Sanic("Two")
@app.get("/two")
async def two(request):
return text("two")
# @app.get("/two")
# async def two(request):
# return text("two")
return app
# return app
@pytest.fixture(autouse=True)
def clean():
Sanic._app_registry = {}
yield
# @pytest.fixture(autouse=True)
# def clean():
# Sanic._app_registry = {}
# yield
def test_serve_same_app_multiple_tuples(app_one, run_multi):
app_one.prepare(port=23456)
app_one.prepare(port=23457)
# def test_serve_same_app_multiple_tuples(app_one, run_multi):
# app_one.prepare(port=23456)
# app_one.prepare(port=23457)
logs = run_multi(app_one)
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23456",
) in logs
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23457",
) in logs
# logs = run_multi(app_one)
# assert (
# "sanic.root",
# logging.INFO,
# "Goin' Fast @ http://127.0.0.1:23456",
# ) in logs
# assert (
# "sanic.root",
# logging.INFO,
# "Goin' Fast @ http://127.0.0.1:23457",
# ) in logs
def test_serve_multiple_apps(app_one, app_two, run_multi):
app_one.prepare(port=23456)
app_two.prepare(port=23457)
# def test_serve_multiple_apps(app_one, app_two, run_multi):
# app_one.prepare(port=23456)
# app_two.prepare(port=23457)
logs = run_multi(app_one)
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23456",
) in logs
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23457",
) in logs
# logs = run_multi(app_one)
# assert (
# "sanic.root",
# logging.INFO,
# "Goin' Fast @ http://127.0.0.1:23456",
# ) in logs
# assert (
# "sanic.root",
# logging.INFO,
# "Goin' Fast @ http://127.0.0.1:23457",
# ) in logs
def test_listeners_on_secondary_app(app_one, app_two, run_multi):
app_one.prepare(port=23456)
app_two.prepare(port=23457)
# def test_listeners_on_secondary_app(app_one, app_two, run_multi):
# app_one.prepare(port=23456)
# app_two.prepare(port=23457)
before_start = AsyncMock()
after_start = AsyncMock()
before_stop = AsyncMock()
after_stop = AsyncMock()
# before_start = AsyncMock()
# after_start = AsyncMock()
# before_stop = AsyncMock()
# after_stop = AsyncMock()
app_two.before_server_start(before_start)
app_two.after_server_start(after_start)
app_two.before_server_stop(before_stop)
app_two.after_server_stop(after_stop)
# app_two.before_server_start(before_start)
# app_two.after_server_start(after_start)
# app_two.before_server_stop(before_stop)
# app_two.after_server_stop(after_stop)
run_multi(app_one)
# run_multi(app_one)
before_start.assert_awaited_once()
after_start.assert_awaited_once()
before_stop.assert_awaited_once()
after_stop.assert_awaited_once()
# before_start.assert_awaited_once()
# after_start.assert_awaited_once()
# before_stop.assert_awaited_once()
# after_stop.assert_awaited_once()
@pytest.mark.parametrize(
"events",
(
(Event.HTTP_LIFECYCLE_BEGIN,),
(Event.HTTP_LIFECYCLE_BEGIN, Event.HTTP_LIFECYCLE_COMPLETE),
(
Event.HTTP_LIFECYCLE_BEGIN,
Event.HTTP_LIFECYCLE_COMPLETE,
Event.HTTP_LIFECYCLE_REQUEST,
),
),
)
def test_signal_synchronization(app_one, app_two, run_multi, events):
app_one.prepare(port=23456)
app_two.prepare(port=23457)
# @pytest.mark.parametrize(
# "events",
# (
# (Event.HTTP_LIFECYCLE_BEGIN,),
# (Event.HTTP_LIFECYCLE_BEGIN, Event.HTTP_LIFECYCLE_COMPLETE),
# (
# Event.HTTP_LIFECYCLE_BEGIN,
# Event.HTTP_LIFECYCLE_COMPLETE,
# Event.HTTP_LIFECYCLE_REQUEST,
# ),
# ),
# )
# def test_signal_synchronization(app_one, app_two, run_multi, events):
# app_one.prepare(port=23456)
# app_two.prepare(port=23457)
for event in events:
app_one.signal(event)(AsyncMock())
# for event in events:
# app_one.signal(event)(AsyncMock())
run_multi(app_one)
# run_multi(app_one)
assert len(app_two.signal_router.routes) == len(events) + 1
# assert len(app_two.signal_router.routes) == len(events) + 1
signal_handlers = {
signal.handler
for signal in app_two.signal_router.routes
if signal.name.startswith("http")
}
# signal_handlers = {
# signal.handler
# for signal in app_two.signal_router.routes
# if signal.name.startswith("http")
# }
assert len(signal_handlers) == 1
assert list(signal_handlers)[0] is OptionalDispatchEvent.noop
# assert len(signal_handlers) == 1
# assert list(signal_handlers)[0] is OptionalDispatchEvent.noop
def test_warning_main_process_listeners_on_secondary(
app_one, app_two, run_multi
):
app_two.main_process_start(AsyncMock())
app_two.main_process_stop(AsyncMock())
app_one.prepare(port=23456)
app_two.prepare(port=23457)
# def test_warning_main_process_listeners_on_secondary(
# app_one, app_two, run_multi
# ):
# app_two.main_process_start(AsyncMock())
# app_two.main_process_stop(AsyncMock())
# app_one.prepare(port=23456)
# app_two.prepare(port=23457)
log = run_multi(app_one)
# log = run_multi(app_one)
message = (
f"Sanic found 2 listener(s) on "
"secondary applications attached to the main "
"process. These will be ignored since main "
"process listeners can only be attached to your "
"primary application: "
f"{repr(app_one)}"
)
# message = (
# f"Sanic found 2 listener(s) on "
# "secondary applications attached to the main "
# "process. These will be ignored since main "
# "process listeners can only be attached to your "
# "primary application: "
# f"{repr(app_one)}"
# )
assert ("sanic.error", logging.WARNING, message) in log
# assert ("sanic.error", logging.WARNING, message) in log
def test_no_applications():
Sanic._app_registry = {}
message = "Did not find any applications."
with pytest.raises(RuntimeError, match=message):
Sanic.serve()
# def test_no_applications():
# Sanic._app_registry = {}
# message = "Did not find any applications."
# with pytest.raises(RuntimeError, match=message):
# Sanic.serve()
def test_oserror_warning(app_one, app_two, run_multi, capfd):
orig = AsyncioServer.__await__
AsyncioServer.__await__ = Mock(side_effect=OSError("foo"))
app_one.prepare(port=23456, workers=2)
app_two.prepare(port=23457, workers=2)
# def test_oserror_warning(app_one, app_two, run_multi, capfd):
# orig = AsyncioServer.__await__
# AsyncioServer.__await__ = Mock(side_effect=OSError("foo"))
# app_one.prepare(port=23456, workers=2)
# app_two.prepare(port=23457, workers=2)
run_multi(app_one)
# run_multi(app_one)
captured = capfd.readouterr()
assert (
"An OSError was detected on startup. The encountered error was: foo"
) in captured.err
# captured = capfd.readouterr()
# assert (
# "An OSError was detected on startup. The encountered error was: foo"
# ) in captured.err
AsyncioServer.__await__ = orig
# AsyncioServer.__await__ = orig
def test_running_multiple_offset_warning(app_one, app_two, run_multi, capfd):
app_one.prepare(port=23456, workers=2)
app_two.prepare(port=23457)
# def test_running_multiple_offset_warning(app_one, app_two, run_multi, capfd):
# app_one.prepare(port=23456, workers=2)
# app_two.prepare(port=23457)
run_multi(app_one)
# run_multi(app_one)
captured = capfd.readouterr()
assert (
f"The primary application {repr(app_one)} is running "
"with 2 worker(s). All "
"application instances will run with the same number. "
f"You requested {repr(app_two)} to run with "
"1 worker(s), which will be ignored "
"in favor of the primary application."
) in captured.err
# captured = capfd.readouterr()
# assert (
# f"The primary application {repr(app_one)} is running "
# "with 2 worker(s). All "
# "application instances will run with the same number. "
# f"You requested {repr(app_two)} to run with "
# "1 worker(s), which will be ignored "
# "in favor of the primary application."
# ) in captured.err
def test_running_multiple_secondary(app_one, app_two, run_multi, capfd):
app_one.prepare(port=23456, workers=2)
app_two.prepare(port=23457)
# def test_running_multiple_secondary(app_one, app_two, run_multi, capfd):
# app_one.prepare(port=23456, workers=2)
# app_two.prepare(port=23457)
before_start = AsyncMock()
app_two.before_server_start(before_start)
run_multi(app_one)
# before_start = AsyncMock()
# app_two.before_server_start(before_start)
# run_multi(app_one)
before_start.await_count == 2
# before_start.await_count == 2

View File

@@ -4,13 +4,15 @@ import pickle
import random
import signal
from asyncio import sleep
import pytest
from sanic_testing.testing import HOST, PORT
from sanic import Blueprint
from sanic import Blueprint, text
from sanic.log import logger
from sanic.response import text
from sanic.server.socket import configure_socket
@pytest.mark.skipif(
@@ -24,14 +26,108 @@ def test_multiprocessing(app):
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
process.terminate()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(3)
app.run(HOST, PORT, workers=num_workers)
signal.alarm(2)
app.run(HOST, 4120, workers=num_workers, debug=True)
assert len(process_list) == num_workers + 1
@pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def test_multiprocessing_legacy(app):
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(2)
app.run(HOST, 4121, workers=num_workers, debug=True, legacy=True)
assert len(process_list) == num_workers
@pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def test_multiprocessing_legacy_sock(app):
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(2)
sock = configure_socket(
{
"host": HOST,
"port": 4121,
"unix": None,
"backlog": 100,
}
)
app.run(workers=num_workers, debug=True, legacy=True, sock=sock)
sock.close()
assert len(process_list) == num_workers
@pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def test_multiprocessing_legacy_unix(app):
"""Tests that the number of children we produce is correct"""
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(2)
app.run(workers=num_workers, debug=True, legacy=True, unix="./test.sock")
assert len(process_list) == num_workers
@@ -45,19 +141,23 @@ def test_multiprocessing_with_blueprint(app):
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
@app.after_server_start
async def shutdown(app):
await sleep(2.1)
app.stop()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
process.terminate()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(3)
signal.alarm(2)
bp = Blueprint("test_text")
app.blueprint(bp)
app.run(HOST, PORT, workers=num_workers)
app.run(HOST, 4121, workers=num_workers, debug=True)
assert len(process_list) == num_workers
assert len(process_list) == num_workers + 1
# this function must be outside a test function so that it can be
@@ -66,62 +166,58 @@ def handler(request):
return text("Hello")
def stop(app):
app.stop()
# Multiprocessing on Windows requires app to be able to be pickled
@pytest.mark.parametrize("protocol", [3, 4])
def test_pickle_app(app, protocol):
app.route("/")(handler)
app.router.finalize()
app.after_server_start(stop)
app.router.reset()
app.signal_router.reset()
p_app = pickle.dumps(app, protocol=protocol)
del app
up_p_app = pickle.loads(p_app)
up_p_app.router.finalize()
assert up_p_app
request, response = up_p_app.test_client.get("/")
assert response.text == "Hello"
up_p_app.run(single_process=True)
@pytest.mark.parametrize("protocol", [3, 4])
def test_pickle_app_with_bp(app, protocol):
bp = Blueprint("test_text")
bp.route("/")(handler)
bp.after_server_start(stop)
app.blueprint(bp)
app.router.finalize()
app.router.reset()
app.signal_router.reset()
p_app = pickle.dumps(app, protocol=protocol)
del app
up_p_app = pickle.loads(p_app)
up_p_app.router.finalize()
assert up_p_app
request, response = up_p_app.test_client.get("/")
assert response.text == "Hello"
up_p_app.run(single_process=True)
@pytest.mark.parametrize("protocol", [3, 4])
def test_pickle_app_with_static(app, protocol):
app.route("/")(handler)
app.after_server_start(stop)
app.static("/static", "/tmp/static")
app.router.finalize()
app.router.reset()
app.signal_router.reset()
p_app = pickle.dumps(app, protocol=protocol)
del app
up_p_app = pickle.loads(p_app)
up_p_app.router.finalize()
assert up_p_app
request, response = up_p_app.test_client.get("/static/missing.txt")
assert response.status == 404
up_p_app.run(single_process=True)
def test_main_process_event(app, caplog):
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process.terminate()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(1)
app.after_server_start(stop)
@app.listener("main_process_start")
def main_process_start(app, loop):

View File

@@ -1,4 +1,3 @@
from httpx import AsyncByteStream
from sanic_testing.reusable import ReusableClient
from sanic.response import json, text

View File

@@ -17,6 +17,10 @@ def no_skip():
yield
Sanic._app_registry = {}
Sanic.should_auto_reload = should_auto_reload
try:
del os.environ["SANIC_MOTD_OUTPUT"]
except KeyError:
...
def get_primary(app: Sanic) -> ApplicationServerInfo:
@@ -55,17 +59,21 @@ def test_reload_dir(app: Sanic, dirs, caplog):
assert ("sanic.root", logging.WARNING, message) in caplog.record_tuples
def test_fast(app: Sanic, run_multi):
app.prepare(fast=True)
def test_fast(app: Sanic, caplog):
@app.after_server_start
async def stop(app, _):
app.stop()
try:
workers = len(os.sched_getaffinity(0))
except AttributeError:
workers = os.cpu_count() or 1
with caplog.at_level(logging.INFO):
app.prepare(fast=True)
assert app.state.fast
assert app.state.workers == workers
logs = run_multi(app, logging.INFO)
messages = [m[2] for m in logs]
messages = [m[2] for m in caplog.record_tuples]
assert f"mode: production, goin' fast w/ {workers} workers" in messages

View File

@@ -568,7 +568,7 @@ def test_streaming_echo():
@app.listener("after_server_start")
async def client_task(app, loop):
try:
reader, writer = await asyncio.open_connection(*addr)
reader, writer = await asyncio.open_connection("localhost", 8000)
await client(app, reader, writer)
finally:
writer.close()
@@ -576,7 +576,7 @@ def test_streaming_echo():
async def client(app, reader, writer):
# Unfortunately httpx does not support 2-way streaming, so do it by hand.
host = f"host: {addr[0]}:{addr[1]}\r\n".encode()
host = f"host: localhost:8000\r\n".encode()
writer.write(
b"POST /echo HTTP/1.1\r\n" + host + b"content-length: 2\r\n"
b"content-type: text/plain; charset=utf-8\r\n"
@@ -625,6 +625,4 @@ def test_streaming_echo():
# Use random port for tests
with closing(socket()) as sock:
sock.bind(("127.0.0.1", 0))
addr = sock.getsockname()
app.run(sock=sock, access_log=False)
app.run(access_log=False)

View File

@@ -3,69 +3,80 @@ import logging
from time import sleep
import pytest
from sanic import Sanic
from sanic.exceptions import ServiceUnavailable
from sanic.log import LOGGING_CONFIG_DEFAULTS
from sanic.response import text
response_timeout_app = Sanic("test_response_timeout")
response_timeout_default_app = Sanic("test_response_timeout_default")
response_handler_cancelled_app = Sanic("test_response_handler_cancelled")
@pytest.fixture
def response_timeout_app():
app = Sanic("test_response_timeout")
app.config.RESPONSE_TIMEOUT = 1
response_timeout_app.config.RESPONSE_TIMEOUT = 1
response_timeout_default_app.config.RESPONSE_TIMEOUT = 1
response_handler_cancelled_app.config.RESPONSE_TIMEOUT = 1
@app.route("/1")
async def handler_1(request):
await asyncio.sleep(2)
return text("OK")
response_handler_cancelled_app.ctx.flag = False
@app.exception(ServiceUnavailable)
def handler_exception(request, exception):
return text("Response Timeout from error_handler.", 503)
return app
@response_timeout_app.route("/1")
async def handler_1(request):
await asyncio.sleep(2)
return text("OK")
@pytest.fixture
def response_timeout_default_app():
app = Sanic("test_response_timeout_default")
app.config.RESPONSE_TIMEOUT = 1
@app.route("/1")
async def handler_2(request):
await asyncio.sleep(2)
return text("OK")
return app
@response_timeout_app.exception(ServiceUnavailable)
def handler_exception(request, exception):
return text("Response Timeout from error_handler.", 503)
@pytest.fixture
def response_handler_cancelled_app():
app = Sanic("test_response_handler_cancelled")
app.config.RESPONSE_TIMEOUT = 1
app.ctx.flag = False
@app.exception(asyncio.CancelledError)
def handler_cancelled(request, exception):
# If we get a CancelledError, it means sanic has already sent a response,
# we should not ever have to handle a CancelledError.
response_handler_cancelled_app.ctx.flag = True
return text("App received CancelledError!", 500)
# The client will never receive this response, because the socket
# is already closed when we get a CancelledError.
@app.route("/1")
async def handler_3(request):
await asyncio.sleep(2)
return text("OK")
return app
@response_timeout_default_app.route("/1")
async def handler_2(request):
await asyncio.sleep(2)
return text("OK")
@response_handler_cancelled_app.exception(asyncio.CancelledError)
def handler_cancelled(request, exception):
# If we get a CancelledError, it means sanic has already sent a response,
# we should not ever have to handle a CancelledError.
response_handler_cancelled_app.ctx.flag = True
return text("App received CancelledError!", 500)
# The client will never receive this response, because the socket
# is already closed when we get a CancelledError.
@response_handler_cancelled_app.route("/1")
async def handler_3(request):
await asyncio.sleep(2)
return text("OK")
def test_server_error_response_timeout():
def test_server_error_response_timeout(response_timeout_app):
request, response = response_timeout_app.test_client.get("/1")
assert response.status == 503
assert response.text == "Response Timeout from error_handler."
def test_default_server_error_response_timeout():
def test_default_server_error_response_timeout(response_timeout_default_app):
request, response = response_timeout_default_app.test_client.get("/1")
assert response.status == 503
assert "Response Timeout" in response.text
def test_response_handler_cancelled():
def test_response_handler_cancelled(response_handler_cancelled_app):
request, response = response_handler_cancelled_app.test_client.get("/1")
assert response.status == 503
assert "Response Timeout" in response.text

View File

@@ -18,12 +18,6 @@ AVAILABLE_LISTENERS = [
"after_server_stop",
]
skipif_no_alarm = pytest.mark.skipif(
not hasattr(signal, "SIGALRM"),
reason="SIGALRM is not implemented for this platform, we have to come "
"up with another timeout strategy to test these",
)
def create_listener(listener_name, in_list):
async def _listener(app, loop):
@@ -42,18 +36,17 @@ def create_listener_no_loop(listener_name, in_list):
def start_stop_app(random_name_app, **run_kwargs):
def stop_on_alarm(signum, frame):
random_name_app.stop()
@random_name_app.after_server_start
async def shutdown(app):
await asyncio.sleep(1.1)
app.stop()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(1)
try:
random_name_app.run(HOST, PORT, **run_kwargs)
random_name_app.run(HOST, PORT, single_process=True, **run_kwargs)
except KeyboardInterrupt:
pass
@skipif_no_alarm
@pytest.mark.parametrize("listener_name", AVAILABLE_LISTENERS)
def test_single_listener(app, listener_name):
"""Test that listeners on their own work"""
@@ -64,7 +57,6 @@ def test_single_listener(app, listener_name):
assert app.name + listener_name == output.pop()
@skipif_no_alarm
@pytest.mark.parametrize("listener_name", AVAILABLE_LISTENERS)
def test_single_listener_no_loop(app, listener_name):
"""Test that listeners on their own work"""
@@ -75,7 +67,6 @@ def test_single_listener_no_loop(app, listener_name):
assert app.name + listener_name == output.pop()
@skipif_no_alarm
@pytest.mark.parametrize("listener_name", AVAILABLE_LISTENERS)
def test_register_listener(app, listener_name):
"""
@@ -90,7 +81,6 @@ def test_register_listener(app, listener_name):
assert app.name + listener_name == output.pop()
@skipif_no_alarm
def test_all_listeners(app):
output = []
for listener_name in AVAILABLE_LISTENERS:
@@ -101,7 +91,6 @@ def test_all_listeners(app):
assert app.name + listener_name == output.pop()
@skipif_no_alarm
def test_all_listeners_as_convenience(app):
output = []
for listener_name in AVAILABLE_LISTENERS:
@@ -159,7 +148,6 @@ def test_create_server_trigger_events(app):
async def stop(app, loop):
nonlocal flag1
flag1 = True
signal.alarm(1)
async def before_stop(app, loop):
nonlocal flag2
@@ -178,10 +166,13 @@ def test_create_server_trigger_events(app):
# Use random port for tests
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(1)
with closing(socket()) as sock:
sock.bind(("127.0.0.1", 0))
serv_coro = app.create_server(return_asyncio_server=True, sock=sock)
serv_coro = app.create_server(
return_asyncio_server=True, sock=sock, debug=True
)
serv_task = asyncio.ensure_future(serv_coro, loop=loop)
server = loop.run_until_complete(serv_task)
loop.run_until_complete(server.startup())
@@ -199,7 +190,6 @@ def test_create_server_trigger_events(app):
loop.run_until_complete(close_task)
# Complete all tasks on the loop
signal.stopped = True
for connection in server.connections:
connection.close_if_idle()
loop.run_until_complete(server.after_stop())

View File

@@ -33,6 +33,7 @@ def set_loop(app, loop):
def after(app, loop):
print("...")
calledq.put(mock.called)
@@ -48,10 +49,31 @@ def test_register_system_signals(app):
app.listener("before_server_start")(set_loop)
app.listener("after_server_stop")(after)
app.run(HOST, PORT)
app.run(HOST, PORT, single_process=True)
assert calledq.get() is True
@pytest.mark.skipif(os.name == "nt", reason="May hang CI on py38/windows")
def test_no_register_system_signals_fails(app):
"""Test if sanic don't register system signals"""
@app.route("/hello")
async def hello_route(request):
return HTTPResponse()
app.listener("after_server_start")(stop)
app.listener("before_server_start")(set_loop)
app.listener("after_server_stop")(after)
message = (
"Cannot run Sanic.serve with register_sys_signals=False. Use "
"either Sanic.serve_single or Sanic.serve_legacy."
)
with pytest.raises(RuntimeError, match=message):
app.prepare(HOST, PORT, register_sys_signals=False)
assert calledq.empty()
@pytest.mark.skipif(os.name == "nt", reason="May hang CI on py38/windows")
def test_dont_register_system_signals(app):
"""Test if sanic don't register system signals"""
@@ -64,7 +86,7 @@ def test_dont_register_system_signals(app):
app.listener("before_server_start")(set_loop)
app.listener("after_server_stop")(after)
app.run(HOST, PORT, register_sys_signals=False)
app.run(HOST, PORT, register_sys_signals=False, single_process=True)
assert calledq.get() is False

View File

@@ -610,24 +610,24 @@ def test_get_ssl_context_only_mkcert(
MockTrustmeCreator.generate_cert.assert_not_called()
def test_no_http3_with_trustme(
app,
monkeypatch,
MockTrustmeCreator,
):
monkeypatch.setattr(
sanic.http.tls.creators, "TrustmeCreator", MockTrustmeCreator
)
MockTrustmeCreator.SUPPORTED = True
app.config.LOCAL_CERT_CREATOR = "TRUSTME"
with pytest.raises(
SanicException,
match=(
"Sorry, you cannot currently use trustme as a local certificate "
"generator for an HTTP/3 server"
),
):
app.run(version=3, debug=True)
# def test_no_http3_with_trustme(
# app,
# monkeypatch,
# MockTrustmeCreator,
# ):
# monkeypatch.setattr(
# sanic.http.tls.creators, "TrustmeCreator", MockTrustmeCreator
# )
# MockTrustmeCreator.SUPPORTED = True
# app.config.LOCAL_CERT_CREATOR = "TRUSTME"
# with pytest.raises(
# SanicException,
# match=(
# "Sorry, you cannot currently use trustme as a local certificate "
# "generator for an HTTP/3 server"
# ),
# ):
# app.run(version=3, debug=True)
def test_sanic_ssl_context_create():

View File

@@ -1,9 +1,6 @@
import asyncio
# import asyncio
import logging
import os
import platform
import subprocess
import sys
from asyncio import AbstractEventLoop
from string import ascii_lowercase
@@ -19,6 +16,11 @@ from sanic.request import Request
from sanic.response import text
# import platform
# import subprocess
# import sys
pytestmark = pytest.mark.skipif(os.name != "posix", reason="UNIX only")
SOCKPATH = "/tmp/sanictest.sock"
SOCKPATH2 = "/tmp/sanictest2.sock"
@@ -49,6 +51,9 @@ def socket_cleanup():
pass
@pytest.mark.xfail(
reason="Flaky Test on Non Linux Infra",
)
def test_unix_socket_creation(caplog: LogCaptureFixture):
from socket import AF_UNIX, socket
@@ -59,14 +64,14 @@ def test_unix_socket_creation(caplog: LogCaptureFixture):
app = Sanic(name="test")
@app.listener("after_server_start")
def running(app: Sanic, loop: AbstractEventLoop):
@app.after_server_start
def running(app: Sanic):
assert os.path.exists(SOCKPATH)
assert ino != os.stat(SOCKPATH).st_ino
app.stop()
with caplog.at_level(logging.INFO):
app.run(unix=SOCKPATH)
app.run(unix=SOCKPATH, single_process=True)
assert (
"sanic.root",
@@ -79,9 +84,9 @@ def test_unix_socket_creation(caplog: LogCaptureFixture):
@pytest.mark.parametrize("path", (".", "no-such-directory/sanictest.sock"))
def test_invalid_paths(path: str):
app = Sanic(name="test")
#
with pytest.raises((FileExistsError, FileNotFoundError)):
app.run(unix=path)
app.run(unix=path, single_process=True)
def test_dont_replace_file():
@@ -90,12 +95,12 @@ def test_dont_replace_file():
app = Sanic(name="test")
@app.listener("after_server_start")
def stop(app: Sanic, loop: AbstractEventLoop):
@app.after_server_start
def stop(app: Sanic):
app.stop()
with pytest.raises(FileExistsError):
app.run(unix=SOCKPATH)
app.run(unix=SOCKPATH, single_process=True)
def test_dont_follow_symlink():
@@ -107,36 +112,36 @@ def test_dont_follow_symlink():
app = Sanic(name="test")
@app.listener("after_server_start")
def stop(app: Sanic, loop: AbstractEventLoop):
@app.after_server_start
def stop(app: Sanic):
app.stop()
with pytest.raises(FileExistsError):
app.run(unix=SOCKPATH)
app.run(unix=SOCKPATH, single_process=True)
def test_socket_deleted_while_running():
app = Sanic(name="test")
@app.listener("after_server_start")
async def hack(app: Sanic, loop: AbstractEventLoop):
@app.after_server_start
async def hack(app: Sanic):
os.unlink(SOCKPATH)
app.stop()
app.run(host="myhost.invalid", unix=SOCKPATH)
app.run(host="myhost.invalid", unix=SOCKPATH, single_process=True)
def test_socket_replaced_with_file():
app = Sanic(name="test")
@app.listener("after_server_start")
async def hack(app: Sanic, loop: AbstractEventLoop):
@app.after_server_start
async def hack(app: Sanic):
os.unlink(SOCKPATH)
with open(SOCKPATH, "w") as f:
f.write("Not a socket")
app.stop()
app.run(host="myhost.invalid", unix=SOCKPATH)
app.run(host="myhost.invalid", unix=SOCKPATH, single_process=True)
def test_unix_connection():
@@ -146,8 +151,8 @@ def test_unix_connection():
def handler(request: Request):
return text(f"{request.conn_info.server}")
@app.listener("after_server_start")
async def client(app: Sanic, loop: AbstractEventLoop):
@app.after_server_start
async def client(app: Sanic):
if httpx_version >= (0, 20):
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
else:
@@ -160,10 +165,7 @@ def test_unix_connection():
finally:
app.stop()
app.run(host="myhost.invalid", unix=SOCKPATH)
app_multi = Sanic(name="test")
app.run(host="myhost.invalid", unix=SOCKPATH, single_process=True)
def handler(request: Request):
@@ -181,86 +183,87 @@ async def client(app: Sanic, loop: AbstractEventLoop):
def test_unix_connection_multiple_workers():
app_multi = Sanic(name="test")
app_multi.get("/")(handler)
app_multi.listener("after_server_start")(client)
app_multi.run(host="myhost.invalid", unix=SOCKPATH, workers=2)
@pytest.mark.xfail(
condition=platform.system() != "Linux",
reason="Flaky Test on Non Linux Infra",
)
async def test_zero_downtime():
"""Graceful server termination and socket replacement on restarts"""
from signal import SIGINT
from time import monotonic as current_time
# @pytest.mark.xfail(
# condition=platform.system() != "Linux",
# reason="Flaky Test on Non Linux Infra",
# )
# async def test_zero_downtime():
# """Graceful server termination and socket replacement on restarts"""
# from signal import SIGINT
# from time import monotonic as current_time
async def client():
if httpx_version >= (0, 20):
transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
else:
transport = httpcore.AsyncConnectionPool(uds=SOCKPATH)
for _ in range(40):
async with httpx.AsyncClient(transport=transport) as client:
r = await client.get("http://localhost/sleep/0.1")
assert r.status_code == 200, r.text
assert r.text == "Slept 0.1 seconds.\n"
# async def client():
# if httpx_version >= (0, 20):
# transport = httpx.AsyncHTTPTransport(uds=SOCKPATH)
# else:
# transport = httpcore.AsyncConnectionPool(uds=SOCKPATH)
# for _ in range(40):
# async with httpx.AsyncClient(transport=transport) as client:
# r = await client.get("http://localhost/sleep/0.1")
# assert r.status_code == 200, r.text
# assert r.text == "Slept 0.1 seconds.\n"
def spawn():
command = [
sys.executable,
"-m",
"sanic",
"--debug",
"--unix",
SOCKPATH,
"examples.delayed_response.app",
]
DN = subprocess.DEVNULL
return subprocess.Popen(
command, stdin=DN, stdout=DN, stderr=subprocess.PIPE
)
# def spawn():
# command = [
# sys.executable,
# "-m",
# "sanic",
# "--debug",
# "--unix",
# SOCKPATH,
# "examples.delayed_response.app",
# ]
# DN = subprocess.DEVNULL
# return subprocess.Popen(
# command, stdin=DN, stdout=DN, stderr=subprocess.PIPE
# )
try:
processes = [spawn()]
while not os.path.exists(SOCKPATH):
if processes[0].poll() is not None:
raise Exception(
"Worker did not start properly. "
f"stderr: {processes[0].stderr.read()}"
)
await asyncio.sleep(0.0001)
ino = os.stat(SOCKPATH).st_ino
task = asyncio.get_event_loop().create_task(client())
start_time = current_time()
while current_time() < start_time + 6:
# Start a new one and wait until the socket is replaced
processes.append(spawn())
while ino == os.stat(SOCKPATH).st_ino:
await asyncio.sleep(0.001)
ino = os.stat(SOCKPATH).st_ino
# Graceful termination of the previous one
processes[-2].send_signal(SIGINT)
# Wait until client has completed all requests
await task
processes[-1].send_signal(SIGINT)
for worker in processes:
try:
worker.wait(1.0)
except subprocess.TimeoutExpired:
raise Exception(
f"Worker would not terminate:\n{worker.stderr}"
)
finally:
for worker in processes:
worker.kill()
# Test for clean run and termination
return_codes = [worker.poll() for worker in processes]
# try:
# processes = [spawn()]
# while not os.path.exists(SOCKPATH):
# if processes[0].poll() is not None:
# raise Exception(
# "Worker did not start properly. "
# f"stderr: {processes[0].stderr.read()}"
# )
# await asyncio.sleep(0.0001)
# ino = os.stat(SOCKPATH).st_ino
# task = asyncio.get_event_loop().create_task(client())
# start_time = current_time()
# while current_time() < start_time + 6:
# # Start a new one and wait until the socket is replaced
# processes.append(spawn())
# while ino == os.stat(SOCKPATH).st_ino:
# await asyncio.sleep(0.001)
# ino = os.stat(SOCKPATH).st_ino
# # Graceful termination of the previous one
# processes[-2].send_signal(SIGINT)
# # Wait until client has completed all requests
# await task
# processes[-1].send_signal(SIGINT)
# for worker in processes:
# try:
# worker.wait(1.0)
# except subprocess.TimeoutExpired:
# raise Exception(
# f"Worker would not terminate:\n{worker.stderr}"
# )
# finally:
# for worker in processes:
# worker.kill()
# # Test for clean run and termination
# return_codes = [worker.poll() for worker in processes]
# Removing last process which seems to be flappy
return_codes.pop()
assert len(processes) > 5
assert all(code == 0 for code in return_codes)
# # Removing last process which seems to be flappy
# return_codes.pop()
# assert len(processes) > 5
# assert all(code == 0 for code in return_codes)
# Removing this check that seems to be flappy
# assert not os.path.exists(SOCKPATH)
# # Removing this check that seems to be flappy
# # assert not os.path.exists(SOCKPATH)

View File

@@ -0,0 +1,167 @@
import json
from datetime import datetime
from logging import ERROR, INFO
from socket import AF_INET, SOCK_STREAM, timeout
from unittest.mock import Mock, patch
import pytest
from sanic.log import Colors
from sanic.worker.inspector import Inspector, inspect
DATA = {
"info": {
"packages": ["foo"],
},
"extra": {
"more": "data",
},
"workers": {"Worker-Name": {"some": "state"}},
}
SERIALIZED = json.dumps(DATA)
def test_inspector_stop():
inspector = Inspector(Mock(), {}, {}, "", 1)
assert inspector.run is True
inspector.stop()
assert inspector.run is False
@patch("sanic.worker.inspector.sys.stdout.write")
@patch("sanic.worker.inspector.socket")
@pytest.mark.parametrize("command", ("foo", "raw", "pretty"))
def test_send_inspect(socket: Mock, write: Mock, command: str):
socket.return_value = socket
socket.__enter__.return_value = socket
socket.recv.return_value = SERIALIZED.encode()
inspect("localhost", 9999, command)
socket.sendall.assert_called_once_with(command.encode())
socket.recv.assert_called_once_with(4096)
socket.connect.assert_called_once_with(("localhost", 9999))
socket.assert_called_once_with(AF_INET, SOCK_STREAM)
if command == "raw":
write.assert_called_once_with(SERIALIZED)
elif command == "pretty":
write.assert_called()
else:
write.assert_not_called()
@patch("sanic.worker.inspector.sys")
@patch("sanic.worker.inspector.socket")
def test_send_inspect_conn_refused(socket: Mock, sys: Mock, caplog):
with caplog.at_level(INFO):
socket.return_value = socket
socket.__enter__.return_value = socket
socket.connect.side_effect = ConnectionRefusedError()
inspect("localhost", 9999, "foo")
socket.close.assert_called_once()
sys.exit.assert_called_once_with(1)
message = (
f"{Colors.RED}Could not connect to inspector at: "
f"{Colors.YELLOW}('localhost', 9999){Colors.END}\n"
"Either the application is not running, or it did not start "
"an inspector instance."
)
assert ("sanic.error", ERROR, message) in caplog.record_tuples
@patch("sanic.worker.inspector.configure_socket")
@pytest.mark.parametrize("action", (b"reload", b"shutdown", b"foo"))
def test_run_inspector(configure_socket: Mock, action: bytes):
sock = Mock()
conn = Mock()
conn.recv.return_value = action
configure_socket.return_value = sock
inspector = Inspector(Mock(), {}, {}, "localhost", 9999)
inspector.reload = Mock() # type: ignore
inspector.shutdown = Mock() # type: ignore
inspector.state_to_json = Mock(return_value="foo") # type: ignore
def accept():
inspector.run = False
return conn, ...
sock.accept = accept
inspector()
configure_socket.assert_called_once_with(
{"host": "localhost", "port": 9999, "unix": None, "backlog": 1}
)
conn.recv.assert_called_with(64)
if action == b"reload":
conn.send.assert_called_with(b"\n")
inspector.reload.assert_called()
inspector.shutdown.assert_not_called()
inspector.state_to_json.assert_not_called()
elif action == b"shutdown":
conn.send.assert_called_with(b"\n")
inspector.reload.assert_not_called()
inspector.shutdown.assert_called()
inspector.state_to_json.assert_not_called()
else:
conn.send.assert_called_with(b'"foo"')
inspector.reload.assert_not_called()
inspector.shutdown.assert_not_called()
inspector.state_to_json.assert_called()
@patch("sanic.worker.inspector.configure_socket")
def test_accept_timeout(configure_socket: Mock):
sock = Mock()
configure_socket.return_value = sock
inspector = Inspector(Mock(), {}, {}, "localhost", 9999)
inspector.reload = Mock() # type: ignore
inspector.shutdown = Mock() # type: ignore
inspector.state_to_json = Mock(return_value="foo") # type: ignore
def accept():
inspector.run = False
raise timeout
sock.accept = accept
inspector()
inspector.reload.assert_not_called()
inspector.shutdown.assert_not_called()
inspector.state_to_json.assert_not_called()
def test_state_to_json():
now = datetime.now()
now_iso = now.isoformat()
app_info = {"app": "hello"}
worker_state = {"Test": {"now": now, "nested": {"foo": now}}}
inspector = Inspector(Mock(), app_info, worker_state, "", 0)
state = inspector.state_to_json()
assert state == {
"info": app_info,
"workers": {"Test": {"now": now_iso, "nested": {"foo": now_iso}}},
}
def test_reload():
publisher = Mock()
inspector = Inspector(publisher, {}, {}, "", 0)
inspector.reload()
publisher.send.assert_called_once_with("__ALL_PROCESSES__:")
def test_shutdown():
publisher = Mock()
inspector = Inspector(publisher, {}, {}, "", 0)
inspector.shutdown()
publisher.send.assert_called_once_with("__TERMINATE__")

102
tests/worker/test_loader.py Normal file
View File

@@ -0,0 +1,102 @@
import sys
from os import getcwd
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import Mock, patch
import pytest
from sanic.app import Sanic
from sanic.worker.loader import AppLoader, CertLoader
STATIC = Path.cwd() / "tests" / "static"
@pytest.mark.parametrize(
"module_input", ("tests.fake.server:app", "tests.fake.server.app")
)
def test_load_app_instance(module_input):
loader = AppLoader(module_input)
app = loader.load()
assert isinstance(app, Sanic)
@pytest.mark.parametrize(
"module_input",
("tests.fake.server:create_app", "tests.fake.server:create_app()"),
)
def test_load_app_factory(module_input):
loader = AppLoader(module_input, as_factory=True)
app = loader.load()
assert isinstance(app, Sanic)
def test_load_app_simple():
loader = AppLoader(str(STATIC), as_simple=True)
app = loader.load()
assert isinstance(app, Sanic)
def test_create_with_factory():
loader = AppLoader(factory=lambda: Sanic("Test"))
app = loader.load()
assert isinstance(app, Sanic)
def test_cwd_in_path():
AppLoader("tests.fake.server:app").load()
assert getcwd() in sys.path
def test_input_is_dir():
loader = AppLoader(str(STATIC))
message = (
"App not found.\n Please use --simple if you are passing a "
f"directory to sanic.\n eg. sanic {str(STATIC)} --simple"
)
with pytest.raises(ValueError, match=message):
loader.load()
def test_input_is_factory():
ns = SimpleNamespace(module="foo")
loader = AppLoader("tests.fake.server:create_app", args=ns)
message = (
"Module is not a Sanic app, it is a function\n If this callable "
"returns a Sanic instance try: \nsanic foo --factory"
)
with pytest.raises(ValueError, match=message):
loader.load()
def test_input_is_module():
ns = SimpleNamespace(module="foo")
loader = AppLoader("tests.fake.server", args=ns)
message = (
"Module is not a Sanic app, it is a module\n "
"Perhaps you meant foo:app?"
)
with pytest.raises(ValueError, match=message):
loader.load()
@pytest.mark.parametrize("creator", ("mkcert", "trustme"))
@patch("sanic.worker.loader.TrustmeCreator")
@patch("sanic.worker.loader.MkcertCreator")
def test_cert_loader(MkcertCreator: Mock, TrustmeCreator: Mock, creator: str):
MkcertCreator.return_value = MkcertCreator
TrustmeCreator.return_value = TrustmeCreator
data = {
"creator": creator,
"key": Path.cwd() / "tests" / "certs" / "localhost" / "privkey.pem",
"cert": Path.cwd() / "tests" / "certs" / "localhost" / "fullchain.pem",
"localhost": "localhost",
}
app = Sanic("Test")
loader = CertLoader(data) # type: ignore
loader.load(app)
creator_class = MkcertCreator if creator == "mkcert" else TrustmeCreator
creator_class.assert_called_once_with(app, data["key"], data["cert"])
creator_class.generate_cert.assert_called_once_with("localhost")

View File

@@ -0,0 +1,217 @@
from signal import SIGINT, SIGKILL
from unittest.mock import Mock, call, patch
import pytest
from sanic.worker.manager import WorkerManager
def fake_serve():
...
def test_manager_no_workers():
message = "Cannot serve with no workers"
with pytest.raises(RuntimeError, match=message):
WorkerManager(
0,
fake_serve,
{},
Mock(),
(Mock(), Mock()),
{},
)
@patch("sanic.worker.process.os")
def test_terminate(os_mock: Mock):
process = Mock()
process.pid = 1234
context = Mock()
context.Process.return_value = process
manager = WorkerManager(
1,
fake_serve,
{},
context,
(Mock(), Mock()),
{},
)
assert manager.terminated is False
manager.terminate()
assert manager.terminated is True
os_mock.kill.assert_called_once_with(1234, SIGINT)
@patch("sanic.worker.process.os")
def test_shutown(os_mock: Mock):
process = Mock()
process.pid = 1234
process.is_alive.return_value = True
context = Mock()
context.Process.return_value = process
manager = WorkerManager(
1,
fake_serve,
{},
context,
(Mock(), Mock()),
{},
)
manager.shutdown()
os_mock.kill.assert_called_once_with(1234, SIGINT)
@patch("sanic.worker.manager.os")
def test_kill(os_mock: Mock):
process = Mock()
process.pid = 1234
context = Mock()
context.Process.return_value = process
manager = WorkerManager(
1,
fake_serve,
{},
context,
(Mock(), Mock()),
{},
)
manager.kill()
os_mock.kill.assert_called_once_with(1234, SIGKILL)
def test_restart_all():
p1 = Mock()
p2 = Mock()
context = Mock()
context.Process.side_effect = [p1, p2, p1, p2]
manager = WorkerManager(
2,
fake_serve,
{},
context,
(Mock(), Mock()),
{},
)
assert len(list(manager.transient_processes))
manager.restart()
p1.terminate.assert_called_once()
p2.terminate.assert_called_once()
context.Process.assert_has_calls(
[
call(
name="Sanic-Server-0-0",
target=fake_serve,
kwargs={"config": {}},
daemon=True,
),
call(
name="Sanic-Server-1-0",
target=fake_serve,
kwargs={"config": {}},
daemon=True,
),
call(
name="Sanic-Server-0-0",
target=fake_serve,
kwargs={"config": {}},
daemon=True,
),
call(
name="Sanic-Server-1-0",
target=fake_serve,
kwargs={"config": {}},
daemon=True,
),
]
)
def test_monitor_all():
p1 = Mock()
p2 = Mock()
sub = Mock()
sub.recv.side_effect = ["__ALL_PROCESSES__:", ""]
context = Mock()
context.Process.side_effect = [p1, p2]
manager = WorkerManager(
2,
fake_serve,
{},
context,
(Mock(), sub),
{},
)
manager.restart = Mock() # type: ignore
manager.wait_for_ack = Mock() # type: ignore
manager.monitor()
manager.restart.assert_called_once_with(
process_names=None, reloaded_files=""
)
def test_monitor_all_with_files():
p1 = Mock()
p2 = Mock()
sub = Mock()
sub.recv.side_effect = ["__ALL_PROCESSES__:foo,bar", ""]
context = Mock()
context.Process.side_effect = [p1, p2]
manager = WorkerManager(
2,
fake_serve,
{},
context,
(Mock(), sub),
{},
)
manager.restart = Mock() # type: ignore
manager.wait_for_ack = Mock() # type: ignore
manager.monitor()
manager.restart.assert_called_once_with(
process_names=None, reloaded_files="foo,bar"
)
def test_monitor_one_process():
p1 = Mock()
p1.name = "Testing"
p2 = Mock()
sub = Mock()
sub.recv.side_effect = [f"{p1.name}:foo,bar", ""]
context = Mock()
context.Process.side_effect = [p1, p2]
manager = WorkerManager(
2,
fake_serve,
{},
context,
(Mock(), sub),
{},
)
manager.restart = Mock() # type: ignore
manager.wait_for_ack = Mock() # type: ignore
manager.monitor()
manager.restart.assert_called_once_with(
process_names=[p1.name], reloaded_files="foo,bar"
)
def test_shutdown_signal():
pub = Mock()
manager = WorkerManager(
1,
fake_serve,
{},
Mock(),
(pub, Mock()),
{},
)
manager.shutdown = Mock() # type: ignore
manager.shutdown_signal(SIGINT, None)
pub.send.assert_called_with(None)
manager.shutdown.assert_called_once_with()

View File

@@ -0,0 +1,119 @@
from multiprocessing import Event
from os import environ, getpid
from typing import Any, Dict
from unittest.mock import Mock
import pytest
from sanic import Sanic
from sanic.worker.multiplexer import WorkerMultiplexer
from sanic.worker.state import WorkerState
@pytest.fixture
def monitor_publisher():
return Mock()
@pytest.fixture
def worker_state():
return {}
@pytest.fixture
def m(monitor_publisher, worker_state):
environ["SANIC_WORKER_NAME"] = "Test"
worker_state["Test"] = {}
yield WorkerMultiplexer(monitor_publisher, worker_state)
del environ["SANIC_WORKER_NAME"]
def test_has_multiplexer_default(app: Sanic):
event = Event()
@app.main_process_start
async def setup(app, _):
app.shared_ctx.event = event
@app.after_server_start
def stop(app):
if hasattr(app, "m") and isinstance(app.m, WorkerMultiplexer):
app.shared_ctx.event.set()
app.stop()
app.run()
assert event.is_set()
def test_not_have_multiplexer_single(app: Sanic):
event = Event()
@app.main_process_start
async def setup(app, _):
app.shared_ctx.event = event
@app.after_server_start
def stop(app):
if hasattr(app, "m") and isinstance(app.m, WorkerMultiplexer):
app.shared_ctx.event.set()
app.stop()
app.run(single_process=True)
assert not event.is_set()
def test_not_have_multiplexer_legacy(app: Sanic):
event = Event()
@app.main_process_start
async def setup(app, _):
app.shared_ctx.event = event
@app.after_server_start
def stop(app):
if hasattr(app, "m") and isinstance(app.m, WorkerMultiplexer):
app.shared_ctx.event.set()
app.stop()
app.run(legacy=True)
assert not event.is_set()
def test_ack(worker_state: Dict[str, Any], m: WorkerMultiplexer):
worker_state["Test"] = {"foo": "bar"}
m.ack()
assert worker_state["Test"] == {"foo": "bar", "state": "ACKED"}
def test_restart_self(monitor_publisher: Mock, m: WorkerMultiplexer):
m.restart()
monitor_publisher.send.assert_called_once_with("Test")
def test_restart_foo(monitor_publisher: Mock, m: WorkerMultiplexer):
m.restart("foo")
monitor_publisher.send.assert_called_once_with("foo")
def test_reload_alias(monitor_publisher: Mock, m: WorkerMultiplexer):
m.reload()
monitor_publisher.send.assert_called_once_with("Test")
def test_terminate(monitor_publisher: Mock, m: WorkerMultiplexer):
m.terminate()
monitor_publisher.send.assert_called_once_with("__TERMINATE__")
def test_properties(
monitor_publisher: Mock, worker_state: Dict[str, Any], m: WorkerMultiplexer
):
assert m.reload == m.restart
assert m.pid == getpid()
assert m.name == "Test"
assert m.workers == worker_state
assert m.state == worker_state["Test"]
assert isinstance(m.state, WorkerState)

View File

@@ -0,0 +1,156 @@
import signal
from asyncio import Event
from pathlib import Path
from unittest.mock import Mock
import pytest
from sanic.app import Sanic
from sanic.worker.loader import AppLoader
from sanic.worker.reloader import Reloader
@pytest.fixture
def reloader():
...
@pytest.fixture
def app():
app = Sanic("Test")
@app.route("/")
def handler(_):
...
return app
@pytest.fixture
def app_loader(app):
return AppLoader(factory=lambda: app)
def run_reloader(reloader):
def stop(*_):
reloader.stop()
signal.signal(signal.SIGALRM, stop)
signal.alarm(1)
reloader()
def is_python_file(filename):
return (isinstance(filename, Path) and (filename.suffix == "py")) or (
isinstance(filename, str) and filename.endswith(".py")
)
def test_reload_send():
publisher = Mock()
reloader = Reloader(publisher, 0.1, set(), Mock())
reloader.reload("foobar")
publisher.send.assert_called_once_with("__ALL_PROCESSES__:foobar")
def test_iter_files():
reloader = Reloader(Mock(), 0.1, set(), Mock())
len_python_files = len(list(reloader.files()))
assert len_python_files > 0
static_dir = Path(__file__).parent.parent / "static"
len_static_files = len(list(static_dir.glob("**/*")))
reloader = Reloader(Mock(), 0.1, set({static_dir}), Mock())
len_total_files = len(list(reloader.files()))
assert len_static_files > 0
assert len_total_files == len_python_files + len_static_files
def test_reloader_triggers_start_stop_listeners(
app: Sanic, app_loader: AppLoader
):
results = []
@app.reload_process_start
def reload_process_start(_):
results.append("reload_process_start")
@app.reload_process_stop
def reload_process_stop(_):
results.append("reload_process_stop")
reloader = Reloader(Mock(), 0.1, set(), app_loader)
run_reloader(reloader)
assert results == ["reload_process_start", "reload_process_stop"]
def test_not_triggered(app_loader):
reload_dir = Path(__file__).parent.parent / "fake"
publisher = Mock()
reloader = Reloader(publisher, 0.1, {reload_dir}, app_loader)
run_reloader(reloader)
publisher.send.assert_not_called()
def test_triggered(app_loader):
paths = set()
def check_file(filename, mtimes):
if (isinstance(filename, Path) and (filename.name == "server.py")) or (
isinstance(filename, str) and "sanic/app.py" in filename
):
paths.add(str(filename))
return True
return False
reload_dir = Path(__file__).parent.parent / "fake"
publisher = Mock()
reloader = Reloader(publisher, 0.1, {reload_dir}, app_loader)
reloader.check_file = check_file # type: ignore
run_reloader(reloader)
assert len(paths) == 2
publisher.send.assert_called()
call_arg = publisher.send.call_args_list[0][0][0]
assert call_arg.startswith("__ALL_PROCESSES__:")
assert call_arg.count(",") == 1
for path in paths:
assert str(path) in call_arg
def test_reloader_triggers_reload_listeners(app: Sanic, app_loader: AppLoader):
before = Event()
after = Event()
def check_file(filename, mtimes):
return not after.is_set()
@app.before_reload_trigger
async def before_reload_trigger(_):
before.set()
@app.after_reload_trigger
async def after_reload_trigger(_):
after.set()
reloader = Reloader(Mock(), 0.1, set(), app_loader)
reloader.check_file = check_file # type: ignore
run_reloader(reloader)
assert before.is_set()
assert after.is_set()
def test_check_file(tmp_path):
current = tmp_path / "testing.txt"
current.touch()
mtimes = {}
assert Reloader.check_file(current, mtimes) is False
assert len(mtimes) == 1
assert Reloader.check_file(current, mtimes) is False
mtimes[current] = mtimes[current] - 1
assert Reloader.check_file(current, mtimes) is True

View File

@@ -0,0 +1,53 @@
from unittest.mock import Mock, call, patch
import pytest
from sanic.app import Sanic
from sanic.http.constants import HTTP
from sanic.server.runners import _run_server_forever, serve
@patch("sanic.server.runners._serve_http_1")
@patch("sanic.server.runners._serve_http_3")
def test_run_http_1(_serve_http_3: Mock, _serve_http_1: Mock, app: Sanic):
serve("", 0, app)
_serve_http_3.assert_not_called()
_serve_http_1.assert_called_once()
@patch("sanic.server.runners._serve_http_1")
@patch("sanic.server.runners._serve_http_3")
def test_run_http_3(_serve_http_3: Mock, _serve_http_1: Mock, app: Sanic):
serve("", 0, app, version=HTTP.VERSION_3)
_serve_http_1.assert_not_called()
_serve_http_3.assert_called_once()
@patch("sanic.server.runners.remove_unix_socket")
@pytest.mark.parametrize("do_cleanup", (True, False))
def test_run_server_forever(remove_unix_socket: Mock, do_cleanup: bool):
loop = Mock()
cleanup = Mock()
loop.run_forever = Mock(side_effect=KeyboardInterrupt())
before_stop = Mock()
before_stop.return_value = Mock()
after_stop = Mock()
after_stop.return_value = Mock()
unix = Mock()
_run_server_forever(
loop, before_stop, after_stop, cleanup if do_cleanup else None, unix
)
loop.run_forever.assert_called_once_with()
loop.run_until_complete.assert_has_calls(
[call(before_stop.return_value), call(after_stop.return_value)]
)
if do_cleanup:
cleanup.assert_called_once_with()
else:
cleanup.assert_not_called()
remove_unix_socket.assert_called_once_with(unix)
loop.close.assert_called_once_with()

View File

@@ -0,0 +1,82 @@
# 18
# 21-29
# 26
# 36-37
# 42
# 55
# 38->
import logging
from ctypes import c_int32
from multiprocessing import Pipe, Queue, Value
from os import environ
from typing import Any
import pytest
from sanic.types.shared_ctx import SharedContext
@pytest.mark.parametrize(
"item,okay",
(
(Pipe(), True),
(Value("i", 0), True),
(Queue(), True),
(c_int32(1), True),
(1, False),
("thing", False),
(object(), False),
),
)
def test_set_items(item: Any, okay: bool, caplog):
ctx = SharedContext()
with caplog.at_level(logging.INFO):
ctx.item = item
assert ctx.is_locked is False
assert len(caplog.record_tuples) == 0 if okay else 1
if not okay:
assert caplog.record_tuples[0][0] == "sanic.error"
assert caplog.record_tuples[0][1] == logging.WARNING
assert "Unsafe object" in caplog.record_tuples[0][2]
@pytest.mark.parametrize(
"item",
(
Pipe(),
Value("i", 0),
Queue(),
c_int32(1),
1,
"thing",
object(),
),
)
def test_set_items_in_worker(item: Any, caplog):
ctx = SharedContext()
environ["SANIC_WORKER_NAME"] = "foo"
with caplog.at_level(logging.INFO):
ctx.item = item
del environ["SANIC_WORKER_NAME"]
assert ctx.is_locked is False
assert len(caplog.record_tuples) == 0
def test_lock():
ctx = SharedContext()
assert ctx.is_locked is False
ctx.lock()
assert ctx.is_locked is True
message = "Cannot set item on locked SharedContext object"
with pytest.raises(RuntimeError, match=message):
ctx.item = 1

View File

@@ -0,0 +1,27 @@
from pathlib import Path
from sanic.server.socket import (
bind_unix_socket,
configure_socket,
remove_unix_socket,
)
def test_setup_and_teardown_unix():
socket_address = "./test.sock"
path = Path.cwd() / socket_address
assert not path.exists()
bind_unix_socket(socket_address)
assert path.exists()
remove_unix_socket(socket_address)
assert not path.exists()
def test_configure_socket():
socket_address = "./test.sock"
path = Path.cwd() / socket_address
assert not path.exists()
configure_socket({"unix": socket_address, "backlog": 100})
assert path.exists()
remove_unix_socket(socket_address)
assert not path.exists()

View File

@@ -0,0 +1,91 @@
import pytest
from sanic.worker.state import WorkerState
def gen_state(**kwargs):
return WorkerState({"foo": kwargs}, "foo")
def test_set_get_state():
state = gen_state()
state["additional"] = 123
assert state["additional"] == 123
assert state.get("additional") == 123
assert state._state == {"foo": {"additional": 123}}
def test_del_state():
state = gen_state(one=1)
assert state["one"] == 1
del state["one"]
assert state._state == {"foo": {}}
def test_iter_state():
result = [item for item in gen_state(one=1, two=2)]
assert result == ["one", "two"]
def test_state_len():
result = [item for item in gen_state(one=1, two=2)]
assert len(result) == 2
def test_state_repr():
assert repr(gen_state(one=1, two=2)) == repr({"one": 1, "two": 2})
def test_state_eq():
state = gen_state(one=1, two=2)
assert state == {"one": 1, "two": 2}
assert state != {"one": 1}
def test_state_keys():
assert list(gen_state(one=1, two=2).keys()) == list(
{"one": 1, "two": 2}.keys()
)
def test_state_values():
assert list(gen_state(one=1, two=2).values()) == list(
{"one": 1, "two": 2}.values()
)
def test_state_items():
assert list(gen_state(one=1, two=2).items()) == list(
{"one": 1, "two": 2}.items()
)
def test_state_update():
state = gen_state()
assert len(state) == 0
state.update({"nine": 9})
assert len(state) == 1
assert state["nine"] == 9
def test_state_pop():
state = gen_state(one=1)
with pytest.raises(NotImplementedError):
state.pop()
def test_state_full():
state = gen_state(one=1)
assert state.full() == {"foo": {"one": 1}}
@pytest.mark.parametrize("key", WorkerState.RESTRICTED)
def test_state_restricted_operation(key):
state = gen_state()
message = f"Cannot set restricted key on WorkerState: {key}"
with pytest.raises(LookupError, match=message):
state[key] = "Nope"
del state[key]
with pytest.raises(LookupError, match=message):
state.update({"okay": True, key: "bad"})

View File

@@ -0,0 +1,113 @@
from os import environ
from unittest.mock import Mock, patch
import pytest
from sanic.app import Sanic
from sanic.worker.loader import AppLoader
from sanic.worker.multiplexer import WorkerMultiplexer
from sanic.worker.serve import worker_serve
@pytest.fixture
def mock_app():
app = Mock()
server_info = Mock()
server_info.settings = {"app": app}
app.state.workers = 1
app.listeners = {"main_process_ready": []}
app.get_motd_data.return_value = ({"packages": ""}, {})
app.state.server_info = [server_info]
return app
def args(app, **kwargs):
params = {**kwargs}
params.setdefault("host", "127.0.0.1")
params.setdefault("port", 9999)
params.setdefault("app_name", "test_config_app")
params.setdefault("monitor_publisher", None)
params.setdefault("app_loader", AppLoader(factory=lambda: app))
return params
def test_config_app(mock_app: Mock):
with patch("sanic.worker.serve._serve_http_1"):
worker_serve(**args(mock_app, config={"FOO": "BAR"}))
mock_app.update_config.assert_called_once_with({"FOO": "BAR"})
def test_bad_process(mock_app: Mock):
environ["SANIC_WORKER_NAME"] = "FOO"
message = "No restart publisher found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app))
message = "No worker state found in worker process"
with pytest.raises(RuntimeError, match=message):
worker_serve(**args(mock_app, monitor_publisher=Mock()))
del environ["SANIC_WORKER_NAME"]
def test_has_multiplexer(app: Sanic):
environ["SANIC_WORKER_NAME"] = "FOO"
Sanic.register_app(app)
with patch("sanic.worker.serve._serve_http_1"):
worker_serve(
**args(app, monitor_publisher=Mock(), worker_state=Mock())
)
assert isinstance(app.multiplexer, WorkerMultiplexer)
del environ["SANIC_WORKER_NAME"]
@patch("sanic.mixins.startup.WorkerManager")
def test_serve_app_implicit(wm: Mock, app):
app.prepare()
Sanic.serve()
wm.call_args[0] == app.state.workers
@patch("sanic.mixins.startup.WorkerManager")
def test_serve_app_explicit(wm: Mock, mock_app):
Sanic.serve(mock_app)
wm.call_args[0] == mock_app.state.workers
@patch("sanic.mixins.startup.WorkerManager")
def test_serve_app_loader(wm: Mock, mock_app):
Sanic.serve(app_loader=AppLoader(factory=lambda: mock_app))
wm.call_args[0] == mock_app.state.workers
# Sanic.serve(factory=lambda: mock_app)
@patch("sanic.mixins.startup.WorkerManager")
def test_serve_app_factory(wm: Mock, mock_app):
Sanic.serve(factory=lambda: mock_app)
wm.call_args[0] == mock_app.state.workers
@patch("sanic.mixins.startup.WorkerManager")
@patch("sanic.mixins.startup.Inspector")
@pytest.mark.parametrize("config", (True, False))
def test_serve_with_inspector(
Inspector: Mock, WorkerManager: Mock, mock_app: Mock, config: bool
):
mock_app.config.INSPECTOR = config
inspector = Mock()
Inspector.return_value = inspector
WorkerManager.return_value = WorkerManager
Sanic.serve(mock_app)
if config:
Inspector.assert_called_once()
WorkerManager.manage.assert_called_once_with(
"Inspector", inspector, {}, transient=False
)
else:
Inspector.assert_not_called()
WorkerManager.manage.assert_not_called()