Listeners for main server process (#2018)

* Initial POC

* Add test case

* Resolve create_server and Gunicorn serve)

* add testr coverage
This commit is contained in:
Adam Hopkins 2021-03-01 15:03:26 +02:00 committed by GitHub
parent 170177feb0
commit 4b968dc611
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 3 deletions

View File

@ -59,6 +59,7 @@ from sanic.server import (
Signal, Signal,
serve, serve,
serve_multiple, serve_multiple,
serve_single,
) )
from sanic.websocket import ConnectionClosed, WebSocketProtocol from sanic.websocket import ConnectionClosed, WebSocketProtocol
@ -817,7 +818,7 @@ class Sanic(BaseSanic):
) )
workers = 1 workers = 1
if workers == 1: if workers == 1:
serve(**server_settings) serve_single(server_settings)
else: else:
serve_multiple(server_settings, workers) serve_multiple(server_settings, workers)
except BaseException: except BaseException:
@ -920,6 +921,13 @@ class Sanic(BaseSanic):
server_settings.get("before_start", []), server_settings.get("before_start", []),
server_settings.get("loop"), server_settings.get("loop"),
) )
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
if main_start or main_stop:
logger.warning(
"Listener events for the main process are not available "
"with create_server()"
)
return await serve( return await serve(
asyncio_server_kwargs=asyncio_server_kwargs, **server_settings asyncio_server_kwargs=asyncio_server_kwargs, **server_settings
@ -1038,6 +1046,8 @@ class Sanic(BaseSanic):
("after_server_start", "after_start", False), ("after_server_start", "after_start", False),
("before_server_stop", "before_stop", True), ("before_server_stop", "before_stop", True),
("after_server_stop", "after_stop", True), ("after_server_stop", "after_stop", True),
("main_process_start", "main_start", False),
("main_process_stop", "main_stop", True),
): ):
listeners = self.listeners[event_name].copy() listeners = self.listeners[event_name].copy()
if reverse: if reverse:

View File

@ -13,6 +13,8 @@ class ListenerEvent(str, Enum):
AFTER_SERVER_START = auto() AFTER_SERVER_START = auto()
BEFORE_SERVER_STOP = auto() BEFORE_SERVER_STOP = auto()
AFTER_SERVER_STOP = auto() AFTER_SERVER_STOP = auto()
MAIN_PROCESS_START = auto()
MAIN_PROCESS_STOP = auto()
class ListenerMixin: class ListenerMixin:

View File

@ -490,7 +490,7 @@ def serve(
create_server method create_server method
:return: Nothing :return: Nothing
""" """
if not run_async: if not run_async and not loop:
# create new event_loop after fork # create new event_loop after fork
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@ -603,7 +603,6 @@ def serve(
trigger_events(after_stop, loop) trigger_events(after_stop, loop)
loop.close()
remove_unix_socket(unix) remove_unix_socket(unix)
@ -700,6 +699,23 @@ def remove_unix_socket(path: Optional[str]) -> None:
pass pass
def serve_single(server_settings):
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
if not server_settings.get("run_async"):
# create new event_loop after fork
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server_settings["loop"] = loop
trigger_events(main_start, server_settings["loop"])
serve(**server_settings)
trigger_events(main_stop, server_settings["loop"])
server_settings["loop"].close()
def serve_multiple(server_settings, workers): def serve_multiple(server_settings, workers):
"""Start multiple server processes simultaneously. Stop on interrupt """Start multiple server processes simultaneously. Stop on interrupt
and terminate signals, and drain connections when complete. and terminate signals, and drain connections when complete.
@ -712,6 +728,13 @@ def serve_multiple(server_settings, workers):
server_settings["reuse_port"] = True server_settings["reuse_port"] = True
server_settings["run_multiple"] = True server_settings["run_multiple"] = True
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
trigger_events(main_start, loop)
# Create a listening socket or use the one in settings # Create a listening socket or use the one in settings
sock = server_settings.get("sock") sock = server_settings.get("sock")
unix = server_settings["unix"] unix = server_settings["unix"]
@ -752,5 +775,8 @@ def serve_multiple(server_settings, workers):
for process in processes: for process in processes:
process.terminate() process.terminate()
trigger_events(main_stop, loop)
sock.close() sock.close()
loop.close()
remove_unix_socket(unix) remove_unix_socket(unix)

View File

@ -7,6 +7,7 @@ import traceback
from gunicorn.workers import base as base # type: ignore from gunicorn.workers import base as base # type: ignore
from sanic.log import logger
from sanic.server import HttpProtocol, Signal, serve, trigger_events from sanic.server import HttpProtocol, Signal, serve, trigger_events
from sanic.websocket import WebSocketProtocol from sanic.websocket import WebSocketProtocol
@ -72,6 +73,15 @@ class GunicornWorker(base.Worker):
) )
self._server_settings["before_start"] = () self._server_settings["before_start"] = ()
main_start = self._server_settings.pop("main_start", None)
main_stop = self._server_settings.pop("main_stop", None)
if main_start or main_stop: # noqa
logger.warning(
"Listener events for the main process are not available "
"with GunicornWorker"
)
self._runner = asyncio.ensure_future(self._run(), loop=self.loop) self._runner = asyncio.ensure_future(self._run(), loop=self.loop)
try: try:
self.loop.run_until_complete(self._runner) self.loop.run_until_complete(self._runner)

View File

@ -70,6 +70,20 @@ def test_asyncio_server_start_serving(app):
# Looks like we can't easily test `serve_forever()` # Looks like we can't easily test `serve_forever()`
def test_create_server_main(app, caplog):
app.listener("main_process_start")(lambda *_: ...)
loop = asyncio.get_event_loop()
with caplog.at_level(logging.INFO):
asyncio_srv_coro = app.create_server(return_asyncio_server=True)
loop.run_until_complete(asyncio_srv_coro)
assert (
"sanic.root",
30,
"Listener events for the main process are not available with "
"create_server()",
) in caplog.record_tuples
def test_app_loop_not_running(app): def test_app_loop_not_running(app):
with pytest.raises(SanicException) as excinfo: with pytest.raises(SanicException) as excinfo:
app.loop app.loop

View File

@ -1,3 +1,4 @@
import logging
import multiprocessing import multiprocessing
import pickle import pickle
import random import random
@ -8,6 +9,7 @@ import pytest
from sanic_testing.testing import HOST, PORT from sanic_testing.testing import HOST, PORT
from sanic import Blueprint from sanic import Blueprint
from sanic.log import logger
from sanic.response import text from sanic.response import text
@ -108,3 +110,35 @@ def test_pickle_app_with_static(app, protocol):
assert up_p_app assert up_p_app
request, response = up_p_app.test_client.get("/static/missing.txt") request, response = up_p_app.test_client.get("/static/missing.txt")
assert response.status == 404 assert response.status == 404
def test_main_process_event(app, caplog):
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process.terminate()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(1)
@app.listener("main_process_start")
def main_process_start(app, loop):
logger.info("main_process_start")
@app.listener("main_process_stop")
def main_process_stop(app, loop):
logger.info("main_process_stop")
with caplog.at_level(logging.INFO):
app.run(HOST, PORT, workers=num_workers)
assert (
caplog.record_tuples.count(("sanic.root", 20, "main_process_start"))
== 1
)
assert (
caplog.record_tuples.count(("sanic.root", 20, "main_process_stop"))
== 1
)