Sanic multi-application server (#2347)

This commit is contained in:
Adam Hopkins 2022-01-16 09:03:04 +02:00 committed by GitHub
parent 4a416e177a
commit b8d991420b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1254 additions and 1017 deletions

View File

@ -7,10 +7,11 @@ on:
tags:
- "!*" # Do not execute on tags
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
branches:
- main
- "*LTS"
jobs:
test:
if: github.event.pull_request.draft == false
runs-on: ${{ matrix.os }}
strategy:
matrix:
@ -30,6 +31,7 @@ jobs:
pip install tox
- name: Run coverage
run: tox -e coverage
continue-on-error: true
- uses: codecov/codecov-action@v2
with:
files: ./coverage.xml

View File

@ -3,28 +3,24 @@ from __future__ import annotations
import asyncio
import logging
import logging.config
import os
import platform
import re
import sys
from asyncio import (
AbstractEventLoop,
CancelledError,
Protocol,
Task,
ensure_future,
get_event_loop,
get_running_loop,
wait_for,
)
from asyncio.futures import Future
from collections import defaultdict, deque
from contextlib import suppress
from functools import partial
from importlib import import_module
from inspect import isawaitable
from pathlib import Path
from socket import socket
from ssl import SSLContext
from traceback import format_exc
from types import SimpleNamespace
from typing import (
@ -54,11 +50,8 @@ from sanic_routing.exceptions import ( # type: ignore
)
from sanic_routing.route import Route # type: ignore
from sanic import reloader_helpers
from sanic.application.ext import setup_ext
from sanic.application.logo import get_logo
from sanic.application.motd import MOTD
from sanic.application.state import ApplicationState, Mode
from sanic.application.state import ApplicationState, Mode, ServerStage
from sanic.asgi import ASGIApp
from sanic.base.root import BaseSanic
from sanic.blueprint_group import BlueprintGroup
@ -72,16 +65,15 @@ from sanic.exceptions import (
URLBuildError,
)
from sanic.handlers import ErrorHandler
from sanic.helpers import _default
from sanic.http import Stage
from sanic.log import (
LOGGING_CONFIG_DEFAULTS,
Colors,
deprecation,
error_logger,
logger,
)
from sanic.mixins.listeners import ListenerEvent
from sanic.mixins.runner import RunnerMixin
from sanic.models.futures import (
FutureException,
FutureListener,
@ -96,13 +88,8 @@ from sanic.models.handler_types import Sanic as SanicVar
from sanic.request import Request
from sanic.response import BaseHTTPResponse, HTTPResponse, ResponseStream
from sanic.router import Router
from sanic.server import AsyncioServer, HttpProtocol
from sanic.server import Signal as ServerSignal
from sanic.server import serve, serve_multiple, serve_single, try_use_uvloop
from sanic.server.protocols.websocket_protocol import WebSocketProtocol
from sanic.server.websockets.impl import ConnectionClosed
from sanic.signals import Signal, SignalRouter
from sanic.tls import process_to_context
from sanic.touchup import TouchUp, TouchUpMeta
@ -119,10 +106,8 @@ if OS_IS_WINDOWS: # no cov
filterwarnings("once", category=DeprecationWarning)
SANIC_PACKAGES = ("sanic-routing", "sanic-testing", "sanic-ext")
class Sanic(BaseSanic, metaclass=TouchUpMeta):
class Sanic(BaseSanic, RunnerMixin, metaclass=TouchUpMeta):
"""
The main application instance
"""
@ -221,7 +206,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
self.blueprints: Dict[str, Blueprint] = {}
self.configure_logging: bool = configure_logging
self.ctx: Any = ctx or SimpleNamespace()
self.debug = False
self.error_handler: ErrorHandler = error_handler or ErrorHandler()
self.listeners: Dict[str, List[ListenerType[Any]]] = defaultdict(list)
self.named_request_middleware: Dict[str, Deque[MiddlewareType]] = {}
@ -265,7 +249,7 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
Only supported when using the `app.run` method.
"""
if not self.is_running and self.asgi is False:
if self.state.stage is ServerStage.STOPPED and self.asgi is False:
raise SanicException(
"Loop can only be retrieved after the app has started "
"running. Not supported with `create_server` function"
@ -1052,286 +1036,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
# Execution
# -------------------------------------------------------------------- #
def make_coffee(self, *args, **kwargs):
self.state.coffee = True
self.run(*args, **kwargs)
def run(
self,
host: Optional[str] = None,
port: Optional[int] = None,
*,
dev: bool = False,
debug: bool = False,
auto_reload: Optional[bool] = None,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
workers: int = 1,
protocol: Optional[Type[Protocol]] = None,
backlog: int = 100,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
loop: AbstractEventLoop = None,
reload_dir: Optional[Union[List[str], str]] = None,
noisy_exceptions: Optional[bool] = None,
motd: bool = True,
fast: bool = False,
verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None,
) -> None:
"""
Run the HTTP Server and listen until keyboard interrupt or term
signal. On termination, drain connections before closing.
:param host: Address to host on
:type host: str
:param port: Port to host on
:type port: int
:param debug: Enables debug output (slows server)
:type debug: bool
:param auto_reload: Reload app whenever its source code is changed.
Enabled by default in debug mode.
:type auto_relaod: bool
:param ssl: SSLContext, or location of certificate and key
for SSL encryption of worker(s)
:type ssl: str, dict, SSLContext or list
:param sock: Socket for the server to accept connections from
:type sock: socket
:param workers: Number of processes received before it is respected
:type workers: int
:param protocol: Subclass of asyncio Protocol class
:type protocol: type[Protocol]
:param backlog: a number of unaccepted connections that the system
will allow before refusing new connections
:type backlog: int
:param register_sys_signals: Register SIG* events
:type register_sys_signals: bool
:param access_log: Enables writing access logs (slows server)
:type access_log: bool
:param unix: Unix socket to listen on instead of TCP port
:type unix: str
:param noisy_exceptions: Log exceptions that are normally considered
to be quiet/silent
:type noisy_exceptions: bool
:return: Nothing
"""
self.state.verbosity = verbosity
if fast and workers != 1:
raise RuntimeError("You cannot use both fast=True and workers=X")
if motd_display:
self.config.MOTD_DISPLAY.update(motd_display)
if reload_dir:
if isinstance(reload_dir, str):
reload_dir = [reload_dir]
for directory in reload_dir:
direc = Path(directory)
if not direc.is_dir():
logger.warning(
f"Directory {directory} could not be located"
)
self.state.reload_dirs.add(Path(directory))
if loop is not None:
raise TypeError(
"loop is not a valid argument. To use an existing loop, "
"change to create_server().\nSee more: "
"https://sanic.readthedocs.io/en/latest/sanic/deploying.html"
"#asynchronous-support"
)
if dev:
debug = True
auto_reload = True
if auto_reload and os.environ.get("SANIC_SERVER_RUNNING") != "true":
return reloader_helpers.watchdog(1.0, self)
if sock is None:
host, port = host or "127.0.0.1", port or 8000
if protocol is None:
protocol = (
WebSocketProtocol if self.websocket_enabled else HttpProtocol
)
# Set explicitly passed configuration values
for attribute, value in {
"ACCESS_LOG": access_log,
"AUTO_RELOAD": auto_reload,
"MOTD": motd,
"NOISY_EXCEPTIONS": noisy_exceptions,
}.items():
if value is not None:
setattr(self.config, attribute, value)
if fast:
self.state.fast = True
try:
workers = len(os.sched_getaffinity(0))
except AttributeError:
workers = os.cpu_count() or 1
server_settings = self._helper(
host=host,
port=port,
debug=debug,
ssl=ssl,
sock=sock,
unix=unix,
workers=workers,
protocol=protocol,
backlog=backlog,
register_sys_signals=register_sys_signals,
)
if self.config.USE_UVLOOP is True or (
self.config.USE_UVLOOP is _default and not OS_IS_WINDOWS
):
try_use_uvloop()
try:
self.is_running = True
self.is_stopping = False
if workers > 1 and os.name != "posix":
logger.warn(
f"Multiprocessing is currently not supported on {os.name},"
" using workers=1 instead"
)
workers = 1
if workers == 1:
serve_single(server_settings)
else:
serve_multiple(server_settings, workers)
except BaseException:
error_logger.exception(
"Experienced exception while trying to serve"
)
raise
finally:
self.is_running = False
logger.info("Server Stopped")
def stop(self):
"""
This kills the Sanic
"""
if not self.is_stopping:
self.shutdown_tasks(timeout=0)
self.is_stopping = True
get_event_loop().stop()
async def create_server(
self,
host: Optional[str] = None,
port: Optional[int] = None,
*,
debug: bool = False,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
protocol: Type[Protocol] = None,
backlog: int = 100,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
return_asyncio_server: bool = False,
asyncio_server_kwargs: Dict[str, Any] = None,
noisy_exceptions: Optional[bool] = None,
) -> Optional[AsyncioServer]:
"""
Asynchronous version of :func:`run`.
This method will take care of the operations necessary to invoke
the *before_start* events via :func:`trigger_events` method invocation
before starting the *sanic* app in Async mode.
.. note::
This does not support multiprocessing and is not the preferred
way to run a :class:`Sanic` application.
:param host: Address to host on
:type host: str
:param port: Port to host on
:type port: int
:param debug: Enables debug output (slows server)
:type debug: bool
:param ssl: SSLContext, or location of certificate and key
for SSL encryption of worker(s)
:type ssl: SSLContext or dict
:param sock: Socket for the server to accept connections from
:type sock: socket
:param protocol: Subclass of asyncio Protocol class
:type protocol: type[Protocol]
:param backlog: a number of unaccepted connections that the system
will allow before refusing new connections
:type backlog: int
:param access_log: Enables writing access logs (slows server)
:type access_log: bool
:param return_asyncio_server: flag that defines whether there's a need
to return asyncio.Server or
start it serving right away
:type return_asyncio_server: bool
:param asyncio_server_kwargs: key-value arguments for
asyncio/uvloop create_server method
:type asyncio_server_kwargs: dict
:param noisy_exceptions: Log exceptions that are normally considered
to be quiet/silent
:type noisy_exceptions: bool
:return: AsyncioServer if return_asyncio_server is true, else Nothing
"""
if sock is None:
host, port = host or "127.0.0.1", port or 8000
if protocol is None:
protocol = (
WebSocketProtocol if self.websocket_enabled else HttpProtocol
)
# Set explicitly passed configuration values
for attribute, value in {
"ACCESS_LOG": access_log,
"NOISY_EXCEPTIONS": noisy_exceptions,
}.items():
if value is not None:
setattr(self.config, attribute, value)
server_settings = self._helper(
host=host,
port=port,
debug=debug,
ssl=ssl,
sock=sock,
unix=unix,
loop=get_event_loop(),
protocol=protocol,
backlog=backlog,
run_async=return_asyncio_server,
)
if self.config.USE_UVLOOP is not _default:
error_logger.warning(
"You are trying to change the uvloop configuration, but "
"this is only effective when using the run(...) method. "
"When using the create_server(...) method Sanic will use "
"the already existing loop."
)
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
if main_start or main_stop:
logger.warning(
"Listener events for the main process are not available "
"with create_server()"
)
return await serve(
asyncio_server_kwargs=asyncio_server_kwargs, **server_settings
)
async def _run_request_middleware(
self, request, request_name=None
): # no cov
@ -1415,100 +1119,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
break
return response
def _helper(
self,
host: Optional[str] = None,
port: Optional[int] = None,
debug: bool = False,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
unix: Optional[str] = None,
workers: int = 1,
loop: AbstractEventLoop = None,
protocol: Type[Protocol] = HttpProtocol,
backlog: int = 100,
register_sys_signals: bool = True,
run_async: bool = False,
):
"""Helper function used by `run` and `create_server`."""
if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0:
raise ValueError(
"PROXIES_COUNT cannot be negative. "
"https://sanic.readthedocs.io/en/latest/sanic/config.html"
"#proxy-configuration"
)
ssl = process_to_context(ssl)
self.debug = debug
self.state.host = host
self.state.port = port
self.state.workers = workers
self.state.ssl = ssl
self.state.unix = unix
self.state.sock = sock
server_settings = {
"protocol": protocol,
"host": host,
"port": port,
"sock": sock,
"unix": unix,
"ssl": ssl,
"app": self,
"signal": ServerSignal(),
"loop": loop,
"register_sys_signals": register_sys_signals,
"backlog": backlog,
}
self.motd(self.serve_location)
if sys.stdout.isatty() and not self.state.is_debug:
error_logger.warning(
f"{Colors.YELLOW}Sanic is running in PRODUCTION mode. "
"Consider using '--debug' or '--dev' while actively "
f"developing your application.{Colors.END}"
)
# Register start/stop events
for event_name, settings_name, reverse in (
("main_process_start", "main_start", False),
("main_process_stop", "main_stop", True),
):
listeners = self.listeners[event_name].copy()
if reverse:
listeners.reverse()
# Prepend sanic to the arguments when listeners are triggered
listeners = [partial(listener, self) for listener in listeners]
server_settings[settings_name] = listeners # type: ignore
if run_async:
server_settings["run_async"] = True
return server_settings
@property
def serve_location(self) -> str:
serve_location = ""
proto = "http"
if self.state.ssl is not None:
proto = "https"
if self.state.unix:
serve_location = f"{self.state.unix} {proto}://..."
elif self.state.sock:
serve_location = f"{self.state.sock.getsockname()} {proto}://..."
elif self.state.host and self.state.port:
# colon(:) is legal for a host only in an ipv6 address
display_host = (
f"[{self.state.host}]"
if ":" in self.state.host
else self.state.host
)
serve_location = f"{proto}://{display_host}:{self.state.port}"
return serve_location
def _build_endpoint_name(self, *parts):
parts = [self.name, *parts]
return ".".join(parts)
@ -1558,12 +1168,13 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
if not isinstance(task, Future):
prepped = cls._prep_task(task, app, loop)
if sys.version_info < (3, 8): # no cov
task = loop.create_task(prepped)
if name:
error_logger.warning(
"Cannot set a name for a task when using Python 3.7. "
"Your task will be created without a name."
)
task = loop.create_task(prepped)
task.get_name = lambda: name
else:
task = loop.create_task(prepped, name=name)
@ -1601,12 +1212,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
:param task: future, couroutine or awaitable
"""
if name and sys.version_info < (3, 8): # no cov
name = None
error_logger.warning(
"Cannot set a name for a task when using Python 3.7. Your "
"task will be created without a name."
)
try:
loop = self.loop # Will raise SanicError if loop is not started
return self._loop_add_task(
@ -1629,12 +1234,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
def get_task(
self, name: str, *, raise_exception: bool = True
) -> Optional[Task]:
if sys.version_info < (3, 8): # no cov
error_logger.warning(
"This feature (get_task) is only supported on using "
"Python 3.8+."
)
return
try:
return self._task_registry[name]
except KeyError:
@ -1651,12 +1250,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
*,
raise_exception: bool = True,
) -> None:
if sys.version_info < (3, 8): # no cov
error_logger.warning(
"This feature (cancel_task) is only supported on using "
"Python 3.8+."
)
return
task = self.get_task(name, raise_exception=raise_exception)
if task and not task.cancelled():
args: Tuple[str, ...] = ()
@ -1675,12 +1268,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
...
def purge_tasks(self):
if sys.version_info < (3, 8): # no cov
error_logger.warning(
"This feature (purge_tasks) is only supported on using "
"Python 3.8+."
)
return
for task in self.tasks:
if task.done() or task.cancelled():
name = task.get_name()
@ -1693,31 +1280,22 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
def shutdown_tasks(
self, timeout: Optional[float] = None, increment: float = 0.1
):
if sys.version_info < (3, 8):
error_logger.warning(
"This feature (shutdown_tasks) is only supported on using "
"Python 3.8+."
)
return
for task in self.tasks:
if task.get_name() != "RunServer":
task.cancel()
if timeout is None:
timeout = self.config.GRACEFUL_SHUTDOWN_TIMEOUT
while len(self._task_registry) and timeout:
self.loop.run_until_complete(asyncio.sleep(increment))
with suppress(RuntimeError):
running_loop = get_running_loop()
running_loop.run_until_complete(asyncio.sleep(increment))
self.purge_tasks()
timeout -= increment
@property
def tasks(self):
if sys.version_info < (3, 8): # no cov
error_logger.warning(
"This feature (tasks) is only supported on using "
"Python 3.8+."
)
return
return iter(self._task_registry.values())
# -------------------------------------------------------------------- #
@ -1767,6 +1345,13 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
@debug.setter
def debug(self, value: bool):
deprecation(
"Setting the value of a Sanic application's debug value directly "
"is deprecated and will be removed in v22.9. Please set it using "
"the CLI, app.run, app.prepare, or directly set "
"app.state.mode to Mode.DEBUG.",
22.9,
)
mode = Mode.DEBUG if value else Mode.PRODUCTION
self.state.mode = mode
@ -1784,80 +1369,60 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
@property
def is_running(self):
deprecation(
"Use of the is_running property is no longer used by Sanic "
"internally. The property is now deprecated and will be removed "
"in version 22.9. You may continue to set the property for your "
"own needs until that time. If you would like to check whether "
"the application is operational, please use app.state.stage. More "
"information is available at ___.",
22.9,
)
return self.state.is_running
@is_running.setter
def is_running(self, value: bool):
deprecation(
"Use of the is_running property is no longer used by Sanic "
"internally. The property is now deprecated and will be removed "
"in version 22.9. You may continue to set the property for your "
"own needs until that time. If you would like to check whether "
"the application is operational, please use app.state.stage. More "
"information is available at ___.",
22.9,
)
self.state.is_running = value
@property
def is_stopping(self):
deprecation(
"Use of the is_stopping property is no longer used by Sanic "
"internally. The property is now deprecated and will be removed "
"in version 22.9. You may continue to set the property for your "
"own needs until that time. If you would like to check whether "
"the application is operational, please use app.state.stage. More "
"information is available at ___.",
22.9,
)
return self.state.is_stopping
@is_stopping.setter
def is_stopping(self, value: bool):
deprecation(
"Use of the is_stopping property is no longer used by Sanic "
"internally. The property is now deprecated and will be removed "
"in version 22.9. You may continue to set the property for your "
"own needs until that time. If you would like to check whether "
"the application is operational, please use app.state.stage. More "
"information is available at ___.",
22.9,
)
self.state.is_stopping = value
@property
def reload_dirs(self):
return self.state.reload_dirs
def motd(self, serve_location):
if self.config.MOTD:
mode = [f"{self.state.mode},"]
if self.state.fast:
mode.append("goin' fast")
if self.state.asgi:
mode.append("ASGI")
else:
if self.state.workers == 1:
mode.append("single worker")
else:
mode.append(f"w/ {self.state.workers} workers")
display = {
"mode": " ".join(mode),
"server": self.state.server,
"python": platform.python_version(),
"platform": platform.platform(),
}
extra = {}
if self.config.AUTO_RELOAD:
reload_display = "enabled"
if self.state.reload_dirs:
reload_display += ", ".join(
[
"",
*(
str(path.absolute())
for path in self.state.reload_dirs
),
]
)
display["auto-reload"] = reload_display
packages = []
for package_name in SANIC_PACKAGES:
module_name = package_name.replace("-", "_")
try:
module = import_module(module_name)
packages.append(f"{package_name}=={module.__version__}")
except ImportError:
...
if packages:
display["packages"] = ", ".join(packages)
if self.config.MOTD_DISPLAY:
extra.update(self.config.MOTD_DISPLAY)
logo = (
get_logo(coffee=self.state.coffee)
if self.config.LOGO == "" or self.config.LOGO is True
else self.config.LOGO
)
MOTD.output(logo, serve_location, display, extra)
@property
def ext(self) -> Extend:
if not hasattr(self, "_ext"):
@ -1955,7 +1520,6 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
async def _startup(self):
self._future_registry.clear()
# Startup Sanic Extensions
if not hasattr(self, "_ext"):
setup_ext(self)
if hasattr(self, "_ext"):
@ -1978,6 +1542,9 @@ class Sanic(BaseSanic, metaclass=TouchUpMeta):
self.__class__._uvloop_setting = self.config.USE_UVLOOP
# Startup time optimizations
if self.state.primary:
# TODO:
# - Raise warning if secondary apps have error handler config
ErrorHandler.finalize(self.error_handler, config=self.config)
TouchUp.run(self)

View File

@ -3,16 +3,17 @@ from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import Enum, auto
from enum import Enum, IntEnum, auto
from pathlib import Path
from socket import socket
from ssl import SSLContext
from typing import TYPE_CHECKING, Any, Optional, Set, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union
from sanic.log import logger
from sanic.server.async_server import AsyncioServer
if TYPE_CHECKING:
if TYPE_CHECKING: # no cov
from sanic import Sanic
@ -32,6 +33,19 @@ class Mode(StrEnum):
DEBUG = auto()
class ServerStage(IntEnum):
STOPPED = auto()
PARTIAL = auto()
SERVING = auto()
@dataclass
class ApplicationServerInfo:
settings: Dict[str, Any]
stage: ServerStage = field(default=ServerStage.STOPPED)
server: Optional[AsyncioServer] = field(default=None)
@dataclass
class ApplicationState:
app: Sanic
@ -45,12 +59,15 @@ class ApplicationState:
unix: Optional[str] = field(default=None)
mode: Mode = field(default=Mode.PRODUCTION)
reload_dirs: Set[Path] = field(default_factory=set)
auto_reload: bool = field(default=False)
server: Server = field(default=Server.SANIC)
is_running: bool = field(default=False)
is_started: bool = field(default=False)
is_stopping: bool = field(default=False)
verbosity: int = field(default=0)
workers: int = field(default=0)
primary: bool = field(default=True)
server_info: List[ApplicationServerInfo] = field(default_factory=list)
# This property relates to the ApplicationState instance and should
# not be changed except in the __post_init__ method
@ -77,3 +94,17 @@ class ApplicationState:
@property
def is_debug(self):
return self.mode is Mode.DEBUG
@property
def stage(self) -> ServerStage:
if not self.server_info:
return ServerStage.STOPPED
if all(info.stage is ServerStage.SERVING for info in self.server_info):
return ServerStage.SERVING
elif any(
info.stage is ServerStage.SERVING for info in self.server_info
):
return ServerStage.PARTIAL
return ServerStage.STOPPED

View File

@ -172,12 +172,6 @@ Or, a path to a directory to run as a simple HTTP server:
kwargs[maybe_arg] = True
if self.args.path:
if self.args.auto_reload or self.args.debug:
kwargs["auto_reload"] = True
kwargs["reload_dir"] = self.args.path
else:
error_logger.warning(
"Ignoring '--reload-dir' since auto reloading was not "
"enabled. If you would like to watch directories for "
"changes, consider using --debug or --auto-reload."
)
return kwargs

695
sanic/mixins/runner.py Normal file
View File

@ -0,0 +1,695 @@
from __future__ import annotations
import os
import platform
import sys
from asyncio import (
AbstractEventLoop,
CancelledError,
Protocol,
all_tasks,
get_event_loop,
get_running_loop,
)
from contextlib import suppress
from functools import partial
from importlib import import_module
from pathlib import Path
from socket import socket
from ssl import SSLContext
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Type, Union
from sanic import reloader_helpers
from sanic.application.logo import get_logo
from sanic.application.motd import MOTD
from sanic.application.state import ApplicationServerInfo, Mode, ServerStage
from sanic.base.meta import SanicMeta
from sanic.compat import OS_IS_WINDOWS
from sanic.helpers import _default
from sanic.log import Colors, error_logger, logger
from sanic.models.handler_types import ListenerType
from sanic.server import Signal as ServerSignal
from sanic.server import try_use_uvloop
from sanic.server.async_server import AsyncioServer
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.protocols.websocket_protocol import WebSocketProtocol
from sanic.server.runners import serve, serve_multiple, serve_single
from sanic.tls import process_to_context
if TYPE_CHECKING: # no cov
from sanic import Sanic
from sanic.application.state import ApplicationState
from sanic.config import Config
SANIC_PACKAGES = ("sanic-routing", "sanic-testing", "sanic-ext")
class RunnerMixin(metaclass=SanicMeta):
_app_registry: Dict[str, Sanic]
config: Config
listeners: Dict[str, List[ListenerType[Any]]]
state: ApplicationState
websocket_enabled: bool
def make_coffee(self, *args, **kwargs):
self.state.coffee = True
self.run(*args, **kwargs)
def run(
self,
host: Optional[str] = None,
port: Optional[int] = None,
*,
dev: bool = False,
debug: bool = False,
auto_reload: Optional[bool] = None,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
workers: int = 1,
protocol: Optional[Type[Protocol]] = None,
backlog: int = 100,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
loop: AbstractEventLoop = None,
reload_dir: Optional[Union[List[str], str]] = None,
noisy_exceptions: Optional[bool] = None,
motd: bool = True,
fast: bool = False,
verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None,
) -> None:
"""
Run the HTTP Server and listen until keyboard interrupt or term
signal. On termination, drain connections before closing.
:param host: Address to host on
:type host: str
:param port: Port to host on
:type port: int
:param debug: Enables debug output (slows server)
:type debug: bool
:param auto_reload: Reload app whenever its source code is changed.
Enabled by default in debug mode.
:type auto_relaod: bool
:param ssl: SSLContext, or location of certificate and key
for SSL encryption of worker(s)
:type ssl: str, dict, SSLContext or list
:param sock: Socket for the server to accept connections from
:type sock: socket
:param workers: Number of processes received before it is respected
:type workers: int
:param protocol: Subclass of asyncio Protocol class
:type protocol: type[Protocol]
:param backlog: a number of unaccepted connections that the system
will allow before refusing new connections
:type backlog: int
:param register_sys_signals: Register SIG* events
:type register_sys_signals: bool
:param access_log: Enables writing access logs (slows server)
:type access_log: bool
:param unix: Unix socket to listen on instead of TCP port
:type unix: str
:param noisy_exceptions: Log exceptions that are normally considered
to be quiet/silent
:type noisy_exceptions: bool
:return: Nothing
"""
self.prepare(
host=host,
port=port,
dev=dev,
debug=debug,
auto_reload=auto_reload,
ssl=ssl,
sock=sock,
workers=workers,
protocol=protocol,
backlog=backlog,
register_sys_signals=register_sys_signals,
access_log=access_log,
unix=unix,
loop=loop,
reload_dir=reload_dir,
noisy_exceptions=noisy_exceptions,
motd=motd,
fast=fast,
verbosity=verbosity,
motd_display=motd_display,
)
self.__class__.serve(primary=self) # type: ignore
def prepare(
self,
host: Optional[str] = None,
port: Optional[int] = None,
*,
dev: bool = False,
debug: bool = False,
auto_reload: Optional[bool] = None,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
workers: int = 1,
protocol: Optional[Type[Protocol]] = None,
backlog: int = 100,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
loop: AbstractEventLoop = None,
reload_dir: Optional[Union[List[str], str]] = None,
noisy_exceptions: Optional[bool] = None,
motd: bool = True,
fast: bool = False,
verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None,
) -> None:
if dev:
debug = True
auto_reload = True
self.state.verbosity = verbosity
if not self.state.auto_reload:
self.state.auto_reload = bool(auto_reload)
if fast and workers != 1:
raise RuntimeError("You cannot use both fast=True and workers=X")
if motd_display:
self.config.MOTD_DISPLAY.update(motd_display)
if reload_dir:
if isinstance(reload_dir, str):
reload_dir = [reload_dir]
for directory in reload_dir:
direc = Path(directory)
if not direc.is_dir():
logger.warning(
f"Directory {directory} could not be located"
)
self.state.reload_dirs.add(Path(directory))
if loop is not None:
raise TypeError(
"loop is not a valid argument. To use an existing loop, "
"change to create_server().\nSee more: "
"https://sanic.readthedocs.io/en/latest/sanic/deploying.html"
"#asynchronous-support"
)
if (
self.__class__.should_auto_reload()
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
): # no cov
return
if sock is None:
host, port = host or "127.0.0.1", port or 8000
if protocol is None:
protocol = (
WebSocketProtocol if self.websocket_enabled else HttpProtocol
)
# Set explicitly passed configuration values
for attribute, value in {
"ACCESS_LOG": access_log,
"AUTO_RELOAD": auto_reload,
"MOTD": motd,
"NOISY_EXCEPTIONS": noisy_exceptions,
}.items():
if value is not None:
setattr(self.config, attribute, value)
if fast:
self.state.fast = True
try:
workers = len(os.sched_getaffinity(0))
except AttributeError: # no cov
workers = os.cpu_count() or 1
server_settings = self._helper(
host=host,
port=port,
debug=debug,
ssl=ssl,
sock=sock,
unix=unix,
workers=workers,
protocol=protocol,
backlog=backlog,
register_sys_signals=register_sys_signals,
)
self.state.server_info.append(
ApplicationServerInfo(settings=server_settings)
)
if self.config.USE_UVLOOP is True or (
self.config.USE_UVLOOP is _default and not OS_IS_WINDOWS
):
try_use_uvloop()
async def create_server(
self,
host: Optional[str] = None,
port: Optional[int] = None,
*,
debug: bool = False,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
protocol: Type[Protocol] = None,
backlog: int = 100,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
return_asyncio_server: bool = False,
asyncio_server_kwargs: Dict[str, Any] = None,
noisy_exceptions: Optional[bool] = None,
) -> Optional[AsyncioServer]:
"""
Asynchronous version of :func:`run`.
This method will take care of the operations necessary to invoke
the *before_start* events via :func:`trigger_events` method invocation
before starting the *sanic* app in Async mode.
.. note::
This does not support multiprocessing and is not the preferred
way to run a :class:`Sanic` application.
:param host: Address to host on
:type host: str
:param port: Port to host on
:type port: int
:param debug: Enables debug output (slows server)
:type debug: bool
:param ssl: SSLContext, or location of certificate and key
for SSL encryption of worker(s)
:type ssl: SSLContext or dict
:param sock: Socket for the server to accept connections from
:type sock: socket
:param protocol: Subclass of asyncio Protocol class
:type protocol: type[Protocol]
:param backlog: a number of unaccepted connections that the system
will allow before refusing new connections
:type backlog: int
:param access_log: Enables writing access logs (slows server)
:type access_log: bool
:param return_asyncio_server: flag that defines whether there's a need
to return asyncio.Server or
start it serving right away
:type return_asyncio_server: bool
:param asyncio_server_kwargs: key-value arguments for
asyncio/uvloop create_server method
:type asyncio_server_kwargs: dict
:param noisy_exceptions: Log exceptions that are normally considered
to be quiet/silent
:type noisy_exceptions: bool
:return: AsyncioServer if return_asyncio_server is true, else Nothing
"""
if sock is None:
host, port = host or "127.0.0.1", port or 8000
if protocol is None:
protocol = (
WebSocketProtocol if self.websocket_enabled else HttpProtocol
)
# Set explicitly passed configuration values
for attribute, value in {
"ACCESS_LOG": access_log,
"NOISY_EXCEPTIONS": noisy_exceptions,
}.items():
if value is not None:
setattr(self.config, attribute, value)
server_settings = self._helper(
host=host,
port=port,
debug=debug,
ssl=ssl,
sock=sock,
unix=unix,
loop=get_event_loop(),
protocol=protocol,
backlog=backlog,
run_async=return_asyncio_server,
)
if self.config.USE_UVLOOP is not _default:
error_logger.warning(
"You are trying to change the uvloop configuration, but "
"this is only effective when using the run(...) method. "
"When using the create_server(...) method Sanic will use "
"the already existing loop."
)
main_start = server_settings.pop("main_start", None)
main_stop = server_settings.pop("main_stop", None)
if main_start or main_stop:
logger.warning(
"Listener events for the main process are not available "
"with create_server()"
)
return await serve(
asyncio_server_kwargs=asyncio_server_kwargs, **server_settings
)
def stop(self):
"""
This kills the Sanic
"""
if self.state.stage is not ServerStage.STOPPED:
self.shutdown_tasks(timeout=0)
for task in all_tasks():
with suppress(AttributeError):
if task.get_name() == "RunServer":
task.cancel()
get_event_loop().stop()
def _helper(
self,
host: Optional[str] = None,
port: Optional[int] = None,
debug: bool = False,
ssl: Union[None, SSLContext, dict, str, list, tuple] = None,
sock: Optional[socket] = None,
unix: Optional[str] = None,
workers: int = 1,
loop: AbstractEventLoop = None,
protocol: Type[Protocol] = HttpProtocol,
backlog: int = 100,
register_sys_signals: bool = True,
run_async: bool = False,
) -> Dict[str, Any]:
"""Helper function used by `run` and `create_server`."""
if self.config.PROXIES_COUNT and self.config.PROXIES_COUNT < 0:
raise ValueError(
"PROXIES_COUNT cannot be negative. "
"https://sanic.readthedocs.io/en/latest/sanic/config.html"
"#proxy-configuration"
)
ssl = process_to_context(ssl)
if not self.state.is_debug:
self.state.mode = Mode.DEBUG if debug else Mode.PRODUCTION
self.state.host = host or ""
self.state.port = port or 0
self.state.workers = workers
self.state.ssl = ssl
self.state.unix = unix
self.state.sock = sock
server_settings = {
"protocol": protocol,
"host": host,
"port": port,
"sock": sock,
"unix": unix,
"ssl": ssl,
"app": self,
"signal": ServerSignal(),
"loop": loop,
"register_sys_signals": register_sys_signals,
"backlog": backlog,
}
self.motd(self.serve_location)
if sys.stdout.isatty() and not self.state.is_debug:
error_logger.warning(
f"{Colors.YELLOW}Sanic is running in PRODUCTION mode. "
"Consider using '--debug' or '--dev' while actively "
f"developing your application.{Colors.END}"
)
# Register start/stop events
for event_name, settings_name, reverse in (
("main_process_start", "main_start", False),
("main_process_stop", "main_stop", True),
):
listeners = self.listeners[event_name].copy()
if reverse:
listeners.reverse()
# Prepend sanic to the arguments when listeners are triggered
listeners = [partial(listener, self) for listener in listeners]
server_settings[settings_name] = listeners # type: ignore
if run_async:
server_settings["run_async"] = True
return server_settings
def motd(self, serve_location):
if self.config.MOTD:
mode = [f"{self.state.mode},"]
if self.state.fast:
mode.append("goin' fast")
if self.state.asgi:
mode.append("ASGI")
else:
if self.state.workers == 1:
mode.append("single worker")
else:
mode.append(f"w/ {self.state.workers} workers")
display = {
"mode": " ".join(mode),
"server": self.state.server,
"python": platform.python_version(),
"platform": platform.platform(),
}
extra = {}
if self.config.AUTO_RELOAD:
reload_display = "enabled"
if self.state.reload_dirs:
reload_display += ", ".join(
[
"",
*(
str(path.absolute())
for path in self.state.reload_dirs
),
]
)
display["auto-reload"] = reload_display
packages = []
for package_name in SANIC_PACKAGES:
module_name = package_name.replace("-", "_")
try:
module = import_module(module_name)
packages.append(f"{package_name}=={module.__version__}")
except ImportError:
...
if packages:
display["packages"] = ", ".join(packages)
if self.config.MOTD_DISPLAY:
extra.update(self.config.MOTD_DISPLAY)
logo = (
get_logo(coffee=self.state.coffee)
if self.config.LOGO == "" or self.config.LOGO is True
else self.config.LOGO
)
MOTD.output(logo, serve_location, display, extra)
@property
def serve_location(self) -> str:
serve_location = ""
proto = "http"
if self.state.ssl is not None:
proto = "https"
if self.state.unix:
serve_location = f"{self.state.unix} {proto}://..."
elif self.state.sock:
serve_location = f"{self.state.sock.getsockname()} {proto}://..."
elif self.state.host and self.state.port:
# colon(:) is legal for a host only in an ipv6 address
display_host = (
f"[{self.state.host}]"
if ":" in self.state.host
else self.state.host
)
serve_location = f"{proto}://{display_host}:{self.state.port}"
return serve_location
@classmethod
def should_auto_reload(cls) -> bool:
return any(app.state.auto_reload for app in cls._app_registry.values())
@classmethod
def serve(cls, primary: Optional[Sanic] = None) -> None:
apps = list(cls._app_registry.values())
if not primary:
try:
primary = apps[0]
except IndexError:
raise RuntimeError("Did not find any applications.")
# We want to run auto_reload if ANY of the applications have it enabled
if (
cls.should_auto_reload()
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
):
reload_dirs: Set[Path] = primary.state.reload_dirs.union(
*(app.state.reload_dirs for app in apps)
)
return reloader_helpers.watchdog(1.0, reload_dirs)
# This exists primarily for unit testing
if not primary.state.server_info: # no cov
for app in apps:
app.state.server_info.clear()
return
primary_server_info = primary.state.server_info[0]
primary.before_server_start(partial(primary._start_servers, apps=apps))
try:
primary_server_info.stage = ServerStage.SERVING
if primary.state.workers > 1 and os.name != "posix": # no cov
logger.warn(
f"Multiprocessing is currently not supported on {os.name},"
" using workers=1 instead"
)
primary.state.workers = 1
if primary.state.workers == 1:
serve_single(primary_server_info.settings)
elif primary.state.workers == 0:
raise RuntimeError("Cannot serve with no workers")
else:
serve_multiple(
primary_server_info.settings, primary.state.workers
)
except BaseException:
error_logger.exception(
"Experienced exception while trying to serve"
)
raise
finally:
primary_server_info.stage = ServerStage.STOPPED
logger.info("Server Stopped")
for app in apps:
app.state.server_info.clear()
app.router.reset()
app.signal_router.reset()
async def _start_servers(
self,
primary: Sanic,
_,
apps: List[Sanic],
) -> None:
for app in apps:
if (
app.name is not primary.name
and app.state.workers != primary.state.workers
and app.state.server_info
):
message = (
f"The primary application {repr(primary)} is running "
f"with {primary.state.workers} worker(s). All "
"application instances will run with the same number. "
f"You requested {repr(app)} to run with "
f"{app.state.workers} worker(s), which will be ignored "
"in favor of the primary application."
)
if sys.stdout.isatty():
message = "".join(
[
Colors.YELLOW,
message,
Colors.END,
]
)
error_logger.warning(message, exc_info=True)
for server_info in app.state.server_info:
if server_info.stage is not ServerStage.SERVING:
app.state.primary = False
handlers = [
*server_info.settings.pop("main_start", []),
*server_info.settings.pop("main_stop", []),
]
if handlers:
error_logger.warning(
f"Sanic found {len(handlers)} listener(s) on "
"secondary applications attached to the main "
"process. These will be ignored since main "
"process listeners can only be attached to your "
"primary application: "
f"{repr(primary)}"
)
if not server_info.settings["loop"]:
server_info.settings["loop"] = get_running_loop()
try:
server_info.server = await serve(
**server_info.settings,
run_async=True,
reuse_port=bool(primary.state.workers - 1),
)
except OSError as e: # no cov
first_message = (
"An OSError was detected on startup. "
"The encountered error was: "
)
second_message = str(e)
if sys.stdout.isatty():
message_parts = [
Colors.YELLOW,
first_message,
Colors.RED,
second_message,
Colors.END,
]
else:
message_parts = [first_message, second_message]
message = "".join(message_parts)
error_logger.warning(message, exc_info=True)
continue
primary.add_task(
self._run_server(app, server_info), name="RunServer"
)
async def _run_server(
self,
app: RunnerMixin,
server_info: ApplicationServerInfo,
) -> None:
try:
# We should never get to this point without a server
# This is primarily to keep mypy happy
if not server_info.server: # no cov
raise RuntimeError("Could not locate AsyncioServer")
if app.state.stage is ServerStage.STOPPED:
server_info.stage = ServerStage.SERVING
await server_info.server.startup()
await server_info.server.before_start()
await server_info.server.after_start()
await server_info.server.serve_forever()
except CancelledError:
# We should never get to this point without a server
# This is primarily to keep mypy happy
if not server_info.server: # no cov
raise RuntimeError("Could not locate AsyncioServer")
await server_info.server.before_stop()
await server_info.server.close()
await server_info.server.after_stop()
finally:
server_info.stage = ServerStage.STOPPED
server_info.server = None

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from ssl import SSLObject
from types import SimpleNamespace
from typing import Any, Dict, Optional

View File

@ -77,7 +77,7 @@ def _check_file(filename, mtimes):
return need_reload
def watchdog(sleep_interval, app):
def watchdog(sleep_interval, reload_dirs):
"""Watch project files, restart worker process if a change happened.
:param sleep_interval: interval in second.
@ -100,7 +100,7 @@ def watchdog(sleep_interval, app):
changed = set()
for filename in itertools.chain(
_iter_module_files(),
*(d.glob("**/*") for d in app.reload_dirs),
*(d.glob("**/*") for d in reload_dirs),
):
try:
if _check_file(filename, mtimes):

View File

@ -360,8 +360,8 @@ class Request:
Covers NoAuth, Basic Auth, Bearer Token, Api Token authentication
schemas.
:return: A named tuple with token or username and password related
to request
:return: A Credentials object with token, or username and password
related to the request
"""
if self.parsed_credentials is None:
try:

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
if TYPE_CHECKING: # no cov
from sanic.app import Sanic
import asyncio

View File

@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Optional
from sanic.touchup.meta import TouchUpMeta
if TYPE_CHECKING:
if TYPE_CHECKING: # no cov
from sanic.app import Sanic
from asyncio import CancelledError

View File

@ -11,7 +11,7 @@ from sanic.server import HttpProtocol
from ..websockets.impl import WebsocketImplProtocol
if TYPE_CHECKING:
if TYPE_CHECKING: # no cov
from websockets import http11

View File

@ -132,7 +132,7 @@ def serve(
try:
http_server = loop.run_until_complete(server_coroutine)
except BaseException:
error_logger.exception("Unable to start server")
error_logger.exception("Unable to start server", exc_info=True)
return
# Ignore SIGINT when run_multiple

View File

@ -10,12 +10,14 @@ from .base import BaseScheme
class OptionalDispatchEvent(BaseScheme):
ident = "ODE"
SYNC_SIGNAL_NAMESPACES = "http."
def __init__(self, app) -> None:
super().__init__(app)
self._sync_events()
self._registered_events = [
signal.path for signal in app.signal_router.routes
signal.name for signal in app.signal_router.routes
]
def run(self, method, module_globals):
@ -31,6 +33,35 @@ class OptionalDispatchEvent(BaseScheme):
return exec_locals[method.__name__]
def _sync_events(self):
all_events = set()
app_events = {}
for app in self.app.__class__._app_registry.values():
if app.state.server_info:
app_events[app] = {
signal.name for signal in app.signal_router.routes
}
all_events.update(app_events[app])
for app, events in app_events.items():
missing = {
x
for x in all_events.difference(events)
if any(x.startswith(y) for y in self.SYNC_SIGNAL_NAMESPACES)
}
if missing:
was_finalized = app.signal_router.finalized
if was_finalized: # no cov
app.signal_router.reset()
for event in missing:
app.signal(event)(self.noop)
if was_finalized: # no cov
app.signal_router.finalize()
@staticmethod
async def noop(**_): # no cov
...
class RemoveDispatch(NodeTransformer):
def __init__(self, registered_events, verbosity: int = 0) -> None:

View File

@ -1,243 +0,0 @@
import asyncio
import logging
import os
import signal
import sys
import traceback
from gunicorn.workers import base # type: ignore
from sanic.compat import UVLOOP_INSTALLED
from sanic.log import logger
from sanic.server import HttpProtocol, Signal, serve, try_use_uvloop
from sanic.server.protocols.websocket_protocol import WebSocketProtocol
try:
import ssl # type: ignore
except ImportError: # no cov
ssl = None # type: ignore
if UVLOOP_INSTALLED: # no cov
try_use_uvloop()
class GunicornWorker(base.Worker):
http_protocol = HttpProtocol
websocket_protocol = WebSocketProtocol
def __init__(self, *args, **kw): # pragma: no cover
super().__init__(*args, **kw)
cfg = self.cfg
if cfg.is_ssl:
self.ssl_context = self._create_ssl_context(cfg)
else:
self.ssl_context = None
self.servers = {}
self.connections = set()
self.exit_code = 0
self.signal = Signal()
def init_process(self):
# create new event_loop after fork
asyncio.get_event_loop().close()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
super().init_process()
def run(self):
is_debug = self.log.loglevel == logging.DEBUG
protocol = (
self.websocket_protocol
if self.app.callable.websocket_enabled
else self.http_protocol
)
self._server_settings = self.app.callable._helper(
loop=self.loop,
debug=is_debug,
protocol=protocol,
ssl=self.ssl_context,
run_async=True,
)
self._server_settings["signal"] = self.signal
self._server_settings.pop("sock")
self._await(self.app.callable._startup())
self._await(
self.app.callable._server_event("init", "before", loop=self.loop)
)
main_start = self._server_settings.pop("main_start", None)
main_stop = self._server_settings.pop("main_stop", None)
if main_start or main_stop: # noqa
logger.warning(
"Listener events for the main process are not available "
"with GunicornWorker"
)
try:
self._await(self._run())
self.app.callable.is_running = True
self._await(
self.app.callable._server_event(
"init", "after", loop=self.loop
)
)
self.loop.run_until_complete(self._check_alive())
self._await(
self.app.callable._server_event(
"shutdown", "before", loop=self.loop
)
)
self.loop.run_until_complete(self.close())
except BaseException:
traceback.print_exc()
finally:
try:
self._await(
self.app.callable._server_event(
"shutdown", "after", loop=self.loop
)
)
except BaseException:
traceback.print_exc()
finally:
self.loop.close()
sys.exit(self.exit_code)
async def close(self):
if self.servers:
# stop accepting connections
self.log.info(
"Stopping server: %s, connections: %s",
self.pid,
len(self.connections),
)
for server in self.servers:
server.close()
await server.wait_closed()
self.servers.clear()
# prepare connections for closing
self.signal.stopped = True
for conn in self.connections:
conn.close_if_idle()
# gracefully shutdown timeout
start_shutdown = 0
graceful_shutdown_timeout = self.cfg.graceful_timeout
while self.connections and (
start_shutdown < graceful_shutdown_timeout
):
await asyncio.sleep(0.1)
start_shutdown = start_shutdown + 0.1
# Force close non-idle connection after waiting for
# graceful_shutdown_timeout
for conn in self.connections:
if hasattr(conn, "websocket") and conn.websocket:
conn.websocket.fail_connection(code=1001)
else:
conn.abort()
async def _run(self):
for sock in self.sockets:
state = dict(requests_count=0)
self._server_settings["host"] = None
self._server_settings["port"] = None
server = await serve(
sock=sock,
connections=self.connections,
state=state,
**self._server_settings
)
self.servers[server] = state
async def _check_alive(self):
# If our parent changed then we shut down.
pid = os.getpid()
try:
while self.alive:
self.notify()
req_count = sum(
self.servers[srv]["requests_count"] for srv in self.servers
)
if self.max_requests and req_count > self.max_requests:
self.alive = False
self.log.info(
"Max requests exceeded, shutting down: %s", self
)
elif pid == os.getpid() and self.ppid != os.getppid():
self.alive = False
self.log.info("Parent changed, shutting down: %s", self)
else:
await asyncio.sleep(1.0, loop=self.loop)
except (Exception, BaseException, GeneratorExit, KeyboardInterrupt):
pass
@staticmethod
def _create_ssl_context(cfg):
"""Creates SSLContext instance for usage in asyncio.create_server.
See ssl.SSLSocket.__init__ for more details.
"""
ctx = ssl.SSLContext(cfg.ssl_version)
ctx.load_cert_chain(cfg.certfile, cfg.keyfile)
ctx.verify_mode = cfg.cert_reqs
if cfg.ca_certs:
ctx.load_verify_locations(cfg.ca_certs)
if cfg.ciphers:
ctx.set_ciphers(cfg.ciphers)
return ctx
def init_signals(self):
# Set up signals through the event loop API.
self.loop.add_signal_handler(
signal.SIGQUIT, self.handle_quit, signal.SIGQUIT, None
)
self.loop.add_signal_handler(
signal.SIGTERM, self.handle_exit, signal.SIGTERM, None
)
self.loop.add_signal_handler(
signal.SIGINT, self.handle_quit, signal.SIGINT, None
)
self.loop.add_signal_handler(
signal.SIGWINCH, self.handle_winch, signal.SIGWINCH, None
)
self.loop.add_signal_handler(
signal.SIGUSR1, self.handle_usr1, signal.SIGUSR1, None
)
self.loop.add_signal_handler(
signal.SIGABRT, self.handle_abort, signal.SIGABRT, None
)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
def handle_quit(self, sig, frame):
self.alive = False
self.app.callable.is_running = False
self.cfg.worker_int(self)
def handle_abort(self, sig, frame):
self.alive = False
self.exit_code = 1
self.cfg.worker_abort(self)
sys.exit(1)
def _await(self, coro):
fut = asyncio.ensure_future(coro, loop=self.loop)
self.loop.run_until_complete(fut)

View File

@ -175,6 +175,21 @@ def run_startup(caplog):
return run
@pytest.fixture
def run_multi(caplog):
def run(app, level=logging.DEBUG):
@app.after_server_start
async def stop(app, _):
app.stop()
with caplog.at_level(level):
Sanic.serve()
return caplog.record_tuples
return run
@pytest.fixture(scope="function")
def message_in_records():
def msg_in_log(records: List[LogRecord], msg: str):

View File

@ -197,7 +197,7 @@ def test_app_enable_websocket(app, websocket_enabled, enable):
assert app.websocket_enabled == True
@patch("sanic.app.WebSocketProtocol")
@patch("sanic.mixins.runner.WebSocketProtocol")
def test_app_websocket_parameters(websocket_protocol_mock, app):
app.config.WEBSOCKET_MAX_SIZE = 44
app.config.WEBSOCKET_PING_TIMEOUT = 48
@ -473,13 +473,14 @@ def test_custom_context():
assert app.ctx == ctx
def test_uvloop_config(app, monkeypatch):
@pytest.mark.parametrize("use", (False, True))
def test_uvloop_config(app, monkeypatch, use):
@app.get("/test")
def handler(request):
return text("ok")
try_use_uvloop = Mock()
monkeypatch.setattr(sanic.app, "try_use_uvloop", try_use_uvloop)
monkeypatch.setattr(sanic.mixins.runner, "try_use_uvloop", try_use_uvloop)
# Default config
app.test_client.get("/test")
@ -489,14 +490,13 @@ def test_uvloop_config(app, monkeypatch):
try_use_uvloop.assert_called_once()
try_use_uvloop.reset_mock()
app.config["USE_UVLOOP"] = False
app.config["USE_UVLOOP"] = use
app.test_client.get("/test")
try_use_uvloop.assert_not_called()
try_use_uvloop.reset_mock()
app.config["USE_UVLOOP"] = True
app.test_client.get("/test")
if use:
try_use_uvloop.assert_called_once()
else:
try_use_uvloop.assert_not_called()
def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
@ -506,7 +506,7 @@ def test_uvloop_cannot_never_called_with_create_server(caplog, monkeypatch):
apps[2].config.USE_UVLOOP = True
try_use_uvloop = Mock()
monkeypatch.setattr(sanic.app, "try_use_uvloop", try_use_uvloop)
monkeypatch.setattr(sanic.mixins.runner, "try_use_uvloop", try_use_uvloop)
loop = asyncio.get_event_loop()
@ -569,3 +569,8 @@ def test_cannot_run_fast_and_workers(app):
message = "You cannot use both fast=True and workers=X"
with pytest.raises(RuntimeError, match=message):
app.run(fast=True, workers=4)
def test_no_workers(app):
with pytest.raises(RuntimeError, match="Cannot serve with no workers"):
app.run(workers=0)

View File

@ -118,10 +118,10 @@ def test_host_port_localhost(cmd):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
firstline = lines[starting_line(lines) + 1]
expected = b"Goin' Fast @ http://localhost:9999"
assert exitcode != 1
assert firstline == b"Goin' Fast @ http://localhost:9999"
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
@ -135,10 +135,10 @@ def test_host_port_ipv4(cmd):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
firstline = lines[starting_line(lines) + 1]
expected = b"Goin' Fast @ http://127.0.0.127:9999"
assert exitcode != 1
assert firstline == b"Goin' Fast @ http://127.0.0.127:9999"
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
@ -152,10 +152,10 @@ def test_host_port_ipv6_any(cmd):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
firstline = lines[starting_line(lines) + 1]
expected = b"Goin' Fast @ http://[::]:9999"
assert exitcode != 1
assert firstline == b"Goin' Fast @ http://[::]:9999"
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
@ -169,10 +169,10 @@ def test_host_port_ipv6_loopback(cmd):
command = ["sanic", "fake.server.app", *cmd]
out, err, exitcode = capture(command)
lines = out.split(b"\n")
firstline = lines[starting_line(lines) + 1]
expected = b"Goin' Fast @ http://[::1]:9999"
assert exitcode != 1
assert firstline == b"Goin' Fast @ http://[::1]:9999"
assert expected in lines, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
@ -191,13 +191,13 @@ def test_num_workers(num, cmd):
out, err, exitcode = capture(command)
lines = out.split(b"\n")
worker_lines = [
line
for line in lines
if b"Starting worker" in line or b"Stopping worker" in line
]
if num == 1:
expected = b"mode: production, single worker"
else:
expected = (f"mode: production, w/ {num} workers").encode()
assert exitcode != 1
assert len(worker_lines) == num * 2, f"Lines found: {lines}"
assert expected in lines, f"Expected {expected}\nLines found: {lines}"
@pytest.mark.parametrize("cmd", ("--debug",))
@ -207,9 +207,11 @@ def test_debug(cmd):
lines = out.split(b"\n")
info = read_app_info(lines)
assert info["debug"] is True
assert info["auto_reload"] is False
assert "dev" not in info
assert info["debug"] is True, f"Lines found: {lines}\nErr output: {err}"
assert (
info["auto_reload"] is False
), f"Lines found: {lines}\nErr output: {err}"
assert "dev" not in info, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize("cmd", ("--dev", "-d"))
@ -219,8 +221,10 @@ def test_dev(cmd):
lines = out.split(b"\n")
info = read_app_info(lines)
assert info["debug"] is True
assert info["auto_reload"] is True
assert info["debug"] is True, f"Lines found: {lines}\nErr output: {err}"
assert (
info["auto_reload"] is True
), f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize("cmd", ("--auto-reload", "-r"))
@ -230,9 +234,11 @@ def test_auto_reload(cmd):
lines = out.split(b"\n")
info = read_app_info(lines)
assert info["debug"] is False
assert info["auto_reload"] is True
assert "dev" not in info
assert info["debug"] is False, f"Lines found: {lines}\nErr output: {err}"
assert (
info["auto_reload"] is True
), f"Lines found: {lines}\nErr output: {err}"
assert "dev" not in info, f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize(
@ -244,7 +250,9 @@ def test_access_logs(cmd, expected):
lines = out.split(b"\n")
info = read_app_info(lines)
assert info["access_log"] is expected
assert (
info["access_log"] is expected
), f"Lines found: {lines}\nErr output: {err}"
@pytest.mark.parametrize("cmd", ("--version", "-v"))
@ -269,4 +277,6 @@ def test_noisy_exceptions(cmd, expected):
lines = out.split(b"\n")
info = read_app_info(lines)
assert info["noisy_exceptions"] is expected
assert (
info["noisy_exceptions"] is expected
), f"Lines found: {lines}\nErr output: {err}"

View File

@ -301,6 +301,9 @@ def test_config_access_log_passing_in_run(app: Sanic):
app.run(port=1340, access_log=False)
assert app.config.ACCESS_LOG is False
app.router.reset()
app.signal_router.reset()
app.run(port=1340, access_log=True)
assert app.config.ACCESS_LOG is True
@ -420,3 +423,15 @@ def test_config_set_methods(app: Sanic, monkeypatch: MonkeyPatch):
app.config.update_config({"FOO": 10})
post_set.assert_called_once_with("FOO", 10)
def test_negative_proxy_count(app: Sanic):
app.config.PROXIES_COUNT = -1
message = (
"PROXIES_COUNT cannot be negative. "
"https://sanic.readthedocs.io/en/latest/sanic/config.html"
"#proxy-configuration"
)
with pytest.raises(ValueError, match=message):
app.prepare()

View File

@ -1,5 +1,4 @@
import logging
import warnings
import pytest

View File

@ -1,11 +1,15 @@
import logging
import os
import platform
import sys
from unittest.mock import Mock
from sanic import __version__
import pytest
from sanic import Sanic, __version__
from sanic.application.logo import BASE_LOGO
from sanic.application.motd import MOTDTTY
from sanic.application.motd import MOTD, MOTDTTY
def test_logo_base(app, run_startup):
@ -83,3 +87,25 @@ def test_motd_display(caplog):
"""
)
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not on 3.7")
def test_reload_dirs(app):
app.config.LOGO = None
app.config.AUTO_RELOAD = True
app.prepare(reload_dir="./", auto_reload=True, motd_display={"foo": "bar"})
existing = MOTD.output
MOTD.output = Mock()
app.motd("foo")
MOTD.output.assert_called_once()
assert (
MOTD.output.call_args.args[2]["auto-reload"]
== f"enabled, {os.getcwd()}"
)
assert MOTD.output.call_args.args[3] == {"foo": "bar"}
MOTD.output = existing
Sanic._app_registry = {}

207
tests/test_multi_serve.py Normal file
View File

@ -0,0 +1,207 @@
import logging
from unittest.mock import Mock
import pytest
from sanic import Sanic
from sanic.response import text
from sanic.server.async_server import AsyncioServer
from sanic.signals import Event
from sanic.touchup.schemes.ode import OptionalDispatchEvent
try:
from unittest.mock import AsyncMock
except ImportError:
from asyncmock import AsyncMock # type: ignore
@pytest.fixture
def app_one():
app = Sanic("One")
@app.get("/one")
async def one(request):
return text("one")
return app
@pytest.fixture
def app_two():
app = Sanic("Two")
@app.get("/two")
async def two(request):
return text("two")
return app
@pytest.fixture(autouse=True)
def clean():
Sanic._app_registry = {}
yield
def test_serve_same_app_multiple_tuples(app_one, run_multi):
app_one.prepare(port=23456)
app_one.prepare(port=23457)
logs = run_multi(app_one)
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23456",
) in logs
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23457",
) in logs
def test_serve_multiple_apps(app_one, app_two, run_multi):
app_one.prepare(port=23456)
app_two.prepare(port=23457)
logs = run_multi(app_one)
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23456",
) in logs
assert (
"sanic.root",
logging.INFO,
"Goin' Fast @ http://127.0.0.1:23457",
) in logs
def test_listeners_on_secondary_app(app_one, app_two, run_multi):
app_one.prepare(port=23456)
app_two.prepare(port=23457)
before_start = AsyncMock()
after_start = AsyncMock()
before_stop = AsyncMock()
after_stop = AsyncMock()
app_two.before_server_start(before_start)
app_two.after_server_start(after_start)
app_two.before_server_stop(before_stop)
app_two.after_server_stop(after_stop)
run_multi(app_one)
before_start.assert_awaited_once()
after_start.assert_awaited_once()
before_stop.assert_awaited_once()
after_stop.assert_awaited_once()
@pytest.mark.parametrize(
"events",
(
(Event.HTTP_LIFECYCLE_BEGIN,),
(Event.HTTP_LIFECYCLE_BEGIN, Event.HTTP_LIFECYCLE_COMPLETE),
(
Event.HTTP_LIFECYCLE_BEGIN,
Event.HTTP_LIFECYCLE_COMPLETE,
Event.HTTP_LIFECYCLE_REQUEST,
),
),
)
def test_signal_synchronization(app_one, app_two, run_multi, events):
app_one.prepare(port=23456)
app_two.prepare(port=23457)
for event in events:
app_one.signal(event)(AsyncMock())
run_multi(app_one)
assert len(app_two.signal_router.routes) == len(events) + 1
signal_handlers = {
signal.handler
for signal in app_two.signal_router.routes
if signal.name.startswith("http")
}
assert len(signal_handlers) == 1
assert list(signal_handlers)[0] is OptionalDispatchEvent.noop
def test_warning_main_process_listeners_on_secondary(
app_one, app_two, run_multi
):
app_two.main_process_start(AsyncMock())
app_two.main_process_stop(AsyncMock())
app_one.prepare(port=23456)
app_two.prepare(port=23457)
log = run_multi(app_one)
message = (
f"Sanic found 2 listener(s) on "
"secondary applications attached to the main "
"process. These will be ignored since main "
"process listeners can only be attached to your "
"primary application: "
f"{repr(app_one)}"
)
assert ("sanic.error", logging.WARNING, message) in log
def test_no_applications():
Sanic._app_registry = {}
message = "Did not find any applications."
with pytest.raises(RuntimeError, match=message):
Sanic.serve()
def test_oserror_warning(app_one, app_two, run_multi, capfd):
orig = AsyncioServer.__await__
AsyncioServer.__await__ = Mock(side_effect=OSError("foo"))
app_one.prepare(port=23456, workers=2)
app_two.prepare(port=23457, workers=2)
run_multi(app_one)
captured = capfd.readouterr()
assert (
"An OSError was detected on startup. The encountered error was: foo"
) in captured.err
AsyncioServer.__await__ = orig
def test_running_multiple_offset_warning(app_one, app_two, run_multi, capfd):
app_one.prepare(port=23456, workers=2)
app_two.prepare(port=23457)
run_multi(app_one)
captured = capfd.readouterr()
assert (
f"The primary application {repr(app_one)} is running "
"with 2 worker(s). All "
"application instances will run with the same number. "
f"You requested {repr(app_two)} to run with "
"1 worker(s), which will be ignored "
"in favor of the primary application."
) in captured.err
def test_running_multiple_secondary(app_one, app_two, run_multi, capfd):
app_one.prepare(port=23456, workers=2)
app_two.prepare(port=23457)
before_start = AsyncMock()
app_two.before_server_start(before_start)
run_multi(app_one)
before_start.await_count == 2

View File

@ -132,11 +132,11 @@ def test_main_process_event(app, caplog):
logger.info("main_process_stop")
@app.main_process_start
def main_process_start(app, loop):
def main_process_start2(app, loop):
logger.info("main_process_start")
@app.main_process_stop
def main_process_stop(app, loop):
def main_process_stop2(app, loop):
logger.info("main_process_stop")
with caplog.at_level(logging.INFO):

71
tests/test_prepare.py Normal file
View File

@ -0,0 +1,71 @@
import logging
import os
from pathlib import Path
from unittest.mock import Mock
import pytest
from sanic import Sanic
from sanic.application.state import ApplicationServerInfo
@pytest.fixture(autouse=True)
def no_skip():
should_auto_reload = Sanic.should_auto_reload
Sanic.should_auto_reload = Mock(return_value=False)
yield
Sanic._app_registry = {}
Sanic.should_auto_reload = should_auto_reload
def get_primary(app: Sanic) -> ApplicationServerInfo:
return app.state.server_info[0]
def test_dev(app: Sanic):
app.prepare(dev=True)
assert app.state.is_debug
assert app.state.auto_reload
def test_motd_display(app: Sanic):
app.prepare(motd_display={"foo": "bar"})
assert app.config.MOTD_DISPLAY["foo"] == "bar"
del app.config.MOTD_DISPLAY["foo"]
@pytest.mark.parametrize("dirs", ("./foo", ("./foo", "./bar")))
def test_reload_dir(app: Sanic, dirs, caplog):
messages = []
with caplog.at_level(logging.WARNING):
app.prepare(reload_dir=dirs)
if isinstance(dirs, str):
dirs = (dirs,)
for d in dirs:
assert Path(d) in app.state.reload_dirs
messages.append(
f"Directory {d} could not be located",
)
for message in messages:
assert ("sanic.root", logging.WARNING, message) in caplog.record_tuples
def test_fast(app: Sanic, run_multi):
app.prepare(fast=True)
try:
workers = len(os.sched_getaffinity(0))
except AttributeError:
workers = os.cpu_count() or 1
assert app.state.fast
assert app.state.workers == workers
logs = run_multi(app, logging.INFO)
messages = [m[2] for m in logs]
assert f"mode: production, goin' fast w/ {workers} workers" in messages

View File

@ -35,7 +35,7 @@ def create_listener(listener_name, in_list):
def start_stop_app(random_name_app, **run_kwargs):
def stop_on_alarm(signum, frame):
raise KeyboardInterrupt("SIGINT for sanic to stop gracefully")
random_name_app.stop()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(1)
@ -130,6 +130,9 @@ async def test_trigger_before_events_create_server_missing_event(app):
def test_create_server_trigger_events(app):
"""Test if create_server can trigger server events"""
def stop_on_alarm(signum, frame):
raise KeyboardInterrupt("...")
flag1 = False
flag2 = False
flag3 = False
@ -137,8 +140,7 @@ def test_create_server_trigger_events(app):
async def stop(app, loop):
nonlocal flag1
flag1 = True
await asyncio.sleep(0.1)
app.stop()
signal.alarm(1)
async def before_stop(app, loop):
nonlocal flag2
@ -155,6 +157,8 @@ def test_create_server_trigger_events(app):
loop = asyncio.get_event_loop()
# Use random port for tests
signal.signal(signal.SIGALRM, stop_on_alarm)
with closing(socket()) as sock:
sock.bind(("127.0.0.1", 0))

View File

@ -1,5 +1,4 @@
import asyncio
import sys
from asyncio.tasks import Task
from unittest.mock import Mock, call
@ -7,9 +6,15 @@ from unittest.mock import Mock, call
import pytest
from sanic.app import Sanic
from sanic.application.state import ApplicationServerInfo, ServerStage
from sanic.response import empty
try:
from unittest.mock import AsyncMock
except ImportError:
from asyncmock import AsyncMock # type: ignore
pytestmark = pytest.mark.asyncio
@ -20,11 +25,14 @@ async def dummy(n=0):
@pytest.fixture(autouse=True)
def mark_app_running(app):
app.is_running = True
def mark_app_running(app: Sanic):
app.state.server_info.append(
ApplicationServerInfo(
stage=ServerStage.SERVING, settings={}, server=AsyncMock()
)
)
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not supported in 3.7")
async def test_add_task_returns_task(app: Sanic):
task = app.add_task(dummy())
@ -32,7 +40,6 @@ async def test_add_task_returns_task(app: Sanic):
assert len(app._task_registry) == 0
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not supported in 3.7")
async def test_add_task_with_name(app: Sanic):
task = app.add_task(dummy(), name="dummy")
@ -44,7 +51,6 @@ async def test_add_task_with_name(app: Sanic):
assert task in app._task_registry.values()
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not supported in 3.7")
async def test_cancel_task(app: Sanic):
task = app.add_task(dummy(3), name="dummy")
@ -62,7 +68,6 @@ async def test_cancel_task(app: Sanic):
assert task.cancelled()
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not supported in 3.7")
async def test_purge_tasks(app: Sanic):
app.add_task(dummy(3), name="dummy")
@ -75,7 +80,6 @@ async def test_purge_tasks(app: Sanic):
assert len(app._task_registry) == 0
@pytest.mark.skipif(sys.version_info < (3, 8), reason="Not supported in 3.7")
def test_shutdown_tasks_on_app_stop():
class TestSanic(Sanic):
shutdown_tasks = Mock()

View File

@ -72,14 +72,12 @@ def test_unix_socket_creation(caplog):
assert not os.path.exists(SOCKPATH)
def test_invalid_paths():
@pytest.mark.parametrize("path", (".", "no-such-directory/sanictest.sock"))
def test_invalid_paths(path):
app = Sanic(name=__name__)
with pytest.raises(FileExistsError):
app.run(unix=".")
with pytest.raises(FileNotFoundError):
app.run(unix="no-such-directory/sanictest.sock")
with pytest.raises((FileExistsError, FileNotFoundError)):
app.run(unix=path)
def test_dont_replace_file():
@ -201,7 +199,7 @@ async def test_zero_downtime():
for _ in range(40):
async with httpx.AsyncClient(transport=transport) as client:
r = await client.get("http://localhost/sleep/0.1")
assert r.status_code == 200
assert r.status_code == 200, r.content
assert r.text == "Slept 0.1 seconds.\n"
def spawn():
@ -209,6 +207,7 @@ async def test_zero_downtime():
sys.executable,
"-m",
"sanic",
"--debug",
"--unix",
SOCKPATH,
"examples.delayed_response.app",

View File

@ -1,200 +0,0 @@
import asyncio
import json
import shlex
import subprocess
import time
import urllib.request
from unittest import mock
import pytest
from sanic_testing.testing import ASGI_PORT as PORT
from sanic.app import Sanic
from sanic.worker import GunicornWorker
@pytest.fixture
def gunicorn_worker():
command = (
"gunicorn "
f"--bind 127.0.0.1:{PORT} "
"--worker-class sanic.worker.GunicornWorker "
"examples.hello_world:app"
)
worker = subprocess.Popen(shlex.split(command))
time.sleep(2)
yield
worker.kill()
@pytest.fixture
def gunicorn_worker_with_access_logs():
command = (
"gunicorn "
f"--bind 127.0.0.1:{PORT + 1} "
"--worker-class sanic.worker.GunicornWorker "
"examples.hello_world:app"
)
worker = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
time.sleep(2)
return worker
@pytest.fixture
def gunicorn_worker_with_env_var():
command = (
'env SANIC_ACCESS_LOG="False" '
"gunicorn "
f"--bind 127.0.0.1:{PORT + 2} "
"--worker-class sanic.worker.GunicornWorker "
"--log-level info "
"examples.hello_world:app"
)
worker = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
time.sleep(2)
return worker
def test_gunicorn_worker(gunicorn_worker):
with urllib.request.urlopen(f"http://localhost:{PORT}/") as f:
res = json.loads(f.read(100).decode())
assert res["test"]
def test_gunicorn_worker_no_logs(gunicorn_worker_with_env_var):
"""
if SANIC_ACCESS_LOG was set to False do not show access logs
"""
with urllib.request.urlopen(f"http://localhost:{PORT + 2}/") as _:
gunicorn_worker_with_env_var.kill()
logs = list(
filter(
lambda x: b"sanic.access" in x,
gunicorn_worker_with_env_var.stdout.read().split(b"\n"),
)
)
assert len(logs) == 0
def test_gunicorn_worker_with_logs(gunicorn_worker_with_access_logs):
"""
default - show access logs
"""
with urllib.request.urlopen(f"http://localhost:{PORT + 1}/") as _:
gunicorn_worker_with_access_logs.kill()
assert (
b"(sanic.access)[INFO][127.0.0.1"
in gunicorn_worker_with_access_logs.stdout.read()
)
class GunicornTestWorker(GunicornWorker):
def __init__(self):
self.app = mock.Mock()
self.app.callable = Sanic("test_gunicorn_worker")
self.servers = {}
self.exit_code = 0
self.cfg = mock.Mock()
self.notify = mock.Mock()
@pytest.fixture
def worker():
return GunicornTestWorker()
def test_worker_init_process(worker):
with mock.patch("sanic.worker.asyncio") as mock_asyncio:
try:
worker.init_process()
except TypeError:
pass
assert mock_asyncio.get_event_loop.return_value.close.called
assert mock_asyncio.new_event_loop.called
assert mock_asyncio.set_event_loop.called
def test_worker_init_signals(worker):
worker.loop = mock.Mock()
worker.init_signals()
assert worker.loop.add_signal_handler.called
def test_handle_abort(worker):
with mock.patch("sanic.worker.sys") as mock_sys:
worker.handle_abort(object(), object())
assert not worker.alive
assert worker.exit_code == 1
mock_sys.exit.assert_called_with(1)
def test_handle_quit(worker):
worker.handle_quit(object(), object())
assert not worker.alive
assert worker.exit_code == 0
async def _a_noop(*a, **kw):
pass
def test_run_max_requests_exceeded(worker):
loop = asyncio.new_event_loop()
worker.ppid = 1
worker.alive = True
sock = mock.Mock()
sock.cfg_addr = ("localhost", 8080)
worker.sockets = [sock]
worker.wsgi = mock.Mock()
worker.connections = set()
worker.log = mock.Mock()
worker.loop = loop
worker.servers = {
"server1": {"requests_count": 14},
"server2": {"requests_count": 15},
}
worker.max_requests = 10
worker._run = mock.Mock(wraps=_a_noop)
# exceeding request count
_runner = asyncio.ensure_future(worker._check_alive(), loop=loop)
loop.run_until_complete(_runner)
assert not worker.alive
worker.notify.assert_called_with()
worker.log.info.assert_called_with(
"Max requests exceeded, shutting " "down: %s", worker
)
def test_worker_close(worker):
loop = asyncio.new_event_loop()
asyncio.sleep = mock.Mock(wraps=_a_noop)
worker.ppid = 1
worker.pid = 2
worker.cfg.graceful_timeout = 1.0
worker.signal = mock.Mock()
worker.signal.stopped = False
worker.wsgi = mock.Mock()
conn = mock.Mock()
conn.websocket = mock.Mock()
conn.websocket.fail_connection = mock.Mock(wraps=_a_noop)
worker.connections = set([conn])
worker.log = mock.Mock()
worker.loop = loop
server = mock.Mock()
server.close = mock.Mock(wraps=lambda *a, **kw: None)
server.wait_closed = mock.Mock(wraps=_a_noop)
worker.servers = {server: {"requests_count": 14}}
worker.max_requests = 10
# close worker
_close = asyncio.ensure_future(worker.close(), loop=loop)
loop.run_until_complete(_close)
assert worker.signal.stopped
assert conn.websocket.fail_connection.called
assert len(worker.servers) == 0

View File

@ -7,6 +7,9 @@ setenv =
{py37,py38,py39,py310,pyNightly}-no-ext: SANIC_NO_UJSON=1
{py37,py38,py39,py310,pyNightly}-no-ext: SANIC_NO_UVLOOP=1
extras = test
allowlist_externals =
pytest
coverage
commands =
pytest {posargs:tests --cov sanic}
- coverage combine --append
@ -41,7 +44,7 @@ commands =
[testenv:docs]
platform = linux|linux2|darwin
whitelist_externals = make
allowlist_externals = make
extras = docs
commands =
make docs-test