Add spinner on startup delay

This commit is contained in:
Adam Hopkins 2022-01-20 13:46:42 +02:00
parent cab7453791
commit 1bb80ba6f3
No known key found for this signature in database
GPG Key ID: 9F85EE6C807303FB
6 changed files with 210 additions and 22 deletions

View File

@ -0,0 +1,88 @@
import os
import sys
import time
from contextlib import contextmanager
from curses.ascii import SP
from queue import Queue
from threading import Thread
if os.name == "nt":
import ctypes
import msvcrt
class _CursorInfo(ctypes.Structure):
_fields_ = [("size", ctypes.c_int), ("visible", ctypes.c_byte)]
class Spinner:
def __init__(self, message: str) -> None:
self.message = message
self.queue: Queue[int] = Queue()
self.spinner = self.cursor()
self.thread = Thread(target=self.run)
def start(self):
self.queue.put(1)
self.thread.start()
self.hide()
def run(self):
while self.queue.get():
output = f"\r{self.message} [{next(self.spinner)}]"
sys.stdout.write(output)
sys.stdout.flush()
time.sleep(0.1)
self.queue.put(1)
def stop(self):
self.queue.put(0)
self.thread.join()
self.show()
@staticmethod
def cursor():
while True:
for cursor in "|/-\\":
yield cursor
@staticmethod
def hide():
if os.name == "nt":
ci = _CursorInfo()
handle = ctypes.windll.kernel32.GetStdHandle(-11)
ctypes.windll.kernel32.GetConsoleCursorInfo(
handle, ctypes.byref(ci)
)
ci.visible = False
ctypes.windll.kernel32.SetConsoleCursorInfo(
handle, ctypes.byref(ci)
)
elif os.name == "posix":
sys.stdout.write("\033[?25l")
sys.stdout.flush()
@staticmethod
def show():
if os.name == "nt":
ci = _CursorInfo()
handle = ctypes.windll.kernel32.GetStdHandle(-11)
ctypes.windll.kernel32.GetConsoleCursorInfo(
handle, ctypes.byref(ci)
)
ci.visible = True
ctypes.windll.kernel32.SetConsoleCursorInfo(
handle, ctypes.byref(ci)
)
elif os.name == "posix":
sys.stdout.write("\033[?25h")
sys.stdout.flush()
@contextmanager
def loading(message: str = "Loading"):
spinner = Spinner(message)
spinner.start()
yield
spinner.stop()

View File

@ -143,6 +143,7 @@ Or, a path to a directory to run as a simple HTTP server:
" Example File: project/sanic_server.py -> app\n" " Example File: project/sanic_server.py -> app\n"
" Example Module: project.sanic_server.app" " Example Module: project.sanic_server.app"
) )
sys.exit(1)
else: else:
raise e raise e
return app return app

View File

@ -134,7 +134,6 @@ class SocketGroup(Group):
"--host", "--host",
dest="host", dest="host",
type=str, type=str,
default="127.0.0.1",
help="Host address [default 127.0.0.1]", help="Host address [default 127.0.0.1]",
) )
self.container.add_argument( self.container.add_argument(
@ -142,7 +141,6 @@ class SocketGroup(Group):
"--port", "--port",
dest="port", dest="port",
type=int, type=int,
default=8000,
help="Port to serve on [default 8000]", help="Port to serve on [default 8000]",
) )
self.container.add_argument( self.container.add_argument(

View File

@ -23,3 +23,9 @@ class Stage(Enum):
class HTTP(IntEnum): class HTTP(IntEnum):
VERSION_1 = 1 VERSION_1 = 1
VERSION_3 = 3 VERSION_3 = 3
def display(self) -> str:
value = str(self.value)
if value == 1:
value = "1.1"
return f"HTTP/{value}"

View File

@ -3,15 +3,16 @@ from __future__ import annotations
import os import os
import ssl import ssl
import subprocess import subprocess
import sys
from contextlib import suppress from contextlib import suppress
from inspect import currentframe, getframeinfo
from pathlib import Path from pathlib import Path
from ssl import SSLContext from ssl import SSLContext
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union
from sanic.application.constants import Mode from sanic.application.constants import Mode
from sanic.application.spinner import loading
from sanic.constants import DEFAULT_LOCAL_TLS_CERT, DEFAULT_LOCAL_TLS_KEY from sanic.constants import DEFAULT_LOCAL_TLS_CERT, DEFAULT_LOCAL_TLS_KEY
from sanic.exceptions import SanicException from sanic.exceptions import SanicException
from sanic.helpers import Default from sanic.helpers import Default
@ -283,6 +284,14 @@ def generate_local_certificate(
): ):
check_mkcert() check_mkcert()
if not key_path.parent.exists() or not cert_path.parent.exists():
raise SanicException(
f"Cannot generate certificate at [{key_path}, {cert_path}]. One "
"or more of the directories does not exist."
)
message = "Generating TLS certificate"
with loading(message):
cmd = [ cmd = [
"mkcert", "mkcert",
"-key-file", "-key-file",
@ -291,7 +300,16 @@ def generate_local_certificate(
str(cert_path), str(cert_path),
localhost, localhost,
] ]
subprocess.run(cmd, check=True) resp = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
sys.stdout.write("\r" + " " * (len(message) + 4))
sys.stdout.flush()
sys.stdout.write(resp.stdout)
def check_mkcert(): def check_mkcert():

View File

@ -26,8 +26,10 @@ from typing import (
Literal, Literal,
Optional, Optional,
Set, Set,
Tuple,
Type, Type,
Union, Union,
cast,
) )
from sanic import reloader_helpers from sanic import reloader_helpers
@ -38,7 +40,7 @@ from sanic.base.meta import SanicMeta
from sanic.compat import OS_IS_WINDOWS from sanic.compat import OS_IS_WINDOWS
from sanic.helpers import _default from sanic.helpers import _default
from sanic.http.constants import HTTP from sanic.http.constants import HTTP
from sanic.http.tls import process_to_context from sanic.http.tls import get_ssl_context, process_to_context
from sanic.log import Colors, error_logger, logger from sanic.log import Colors, error_logger, logger
from sanic.models.handler_types import ListenerType from sanic.models.handler_types import ListenerType
from sanic.server import Signal as ServerSignal from sanic.server import Signal as ServerSignal
@ -181,6 +183,11 @@ class RunnerMixin(metaclass=SanicMeta):
verbosity: int = 0, verbosity: int = 0,
motd_display: Optional[Dict[str, str]] = None, motd_display: Optional[Dict[str, str]] = None,
) -> None: ) -> None:
if version == 3 and self.state.server_info:
raise RuntimeError(
"Serving multiple HTTP/3 instances is not supported."
)
if dev: if dev:
debug = True debug = True
auto_reload = True auto_reload = True
@ -222,7 +229,7 @@ class RunnerMixin(metaclass=SanicMeta):
return return
if sock is None: if sock is None:
host, port = host or "127.0.0.1", port or 8000 host, port = self.get_address(host, port, version)
if protocol is None: if protocol is None:
protocol = ( protocol = (
@ -327,7 +334,7 @@ class RunnerMixin(metaclass=SanicMeta):
""" """
if sock is None: if sock is None:
host, port = host or "127.0.0.1", port or 8000 host, port = host, port = self.get_address(host, port)
if protocol is None: if protocol is None:
protocol = ( protocol = (
@ -411,13 +418,19 @@ class RunnerMixin(metaclass=SanicMeta):
"#proxy-configuration" "#proxy-configuration"
) )
if not self.state.is_debug:
self.state.mode = Mode.DEBUG if debug else Mode.PRODUCTION
if isinstance(version, int): if isinstance(version, int):
version = HTTP(version) version = HTTP(version)
ssl = process_to_context(ssl) ssl = process_to_context(ssl)
if version is HTTP.VERSION_3:
if not self.state.is_debug: # TODO:
self.state.mode = Mode.DEBUG if debug else Mode.PRODUCTION # - Add API option to allow localhost TLS also on HTTP/1.1
if TYPE_CHECKING:
self = cast(Sanic, self)
ssl = get_ssl_context(self, ssl)
self.state.host = host or "" self.state.host = host or ""
self.state.port = port or 0 self.state.port = port or 0
@ -441,7 +454,7 @@ class RunnerMixin(metaclass=SanicMeta):
"backlog": backlog, "backlog": backlog,
} }
self.motd(self.serve_location) self.motd(server_settings=server_settings)
if sys.stdout.isatty() and not self.state.is_debug: if sys.stdout.isatty() and not self.state.is_debug:
error_logger.warning( error_logger.warning(
@ -467,7 +480,16 @@ class RunnerMixin(metaclass=SanicMeta):
return server_settings return server_settings
def motd(self, serve_location): def motd(
self,
serve_location: str = "",
server_settings: Optional[Dict[str, Any]] = None,
):
if serve_location:
# TODO: Deprecation warning
...
else:
serve_location = self.get_server_location(server_settings)
if self.config.MOTD: if self.config.MOTD:
mode = [f"{self.state.mode},"] mode = [f"{self.state.mode},"]
if self.state.fast: if self.state.fast:
@ -480,9 +502,16 @@ class RunnerMixin(metaclass=SanicMeta):
else: else:
mode.append(f"w/ {self.state.workers} workers") mode.append(f"w/ {self.state.workers} workers")
server = ", ".join(
(
self.state.server,
server_settings["version"].display(), # type: ignore
)
)
display = { display = {
"mode": " ".join(mode), "mode": " ".join(mode),
"server": self.state.server, "server": server,
"python": platform.python_version(), "python": platform.python_version(),
"platform": platform.platform(), "platform": platform.platform(),
} }
@ -506,7 +535,9 @@ class RunnerMixin(metaclass=SanicMeta):
module_name = package_name.replace("-", "_") module_name = package_name.replace("-", "_")
try: try:
module = import_module(module_name) module = import_module(module_name)
packages.append(f"{package_name}=={module.__version__}") packages.append(
f"{package_name}=={module.__version__}" # type: ignore
)
except ImportError: except ImportError:
... ...
@ -526,6 +557,10 @@ class RunnerMixin(metaclass=SanicMeta):
@property @property
def serve_location(self) -> str: def serve_location(self) -> str:
# TODO:
# - Will show only the primary server information. The state needs to
# reflect only the first server_info.
# - Deprecate this property in favor of getter
serve_location = "" serve_location = ""
proto = "http" proto = "http"
if self.state.ssl is not None: if self.state.ssl is not None:
@ -545,6 +580,48 @@ class RunnerMixin(metaclass=SanicMeta):
return serve_location return serve_location
@staticmethod
def get_server_location(
server_settings: Optional[Dict[str, Any]] = None
) -> str:
# TODO:
# - Update server_settings to an obj
serve_location = ""
proto = "http"
if not server_settings:
return serve_location
if server_settings["ssl"] is not None:
proto = "https"
if server_settings["unix"]:
serve_location = f'{server_settings["unix"]} {proto}://...'
elif server_settings["sock"]:
serve_location = (
f'{server_settings["sock"].getsockname()} {proto}://...'
)
elif server_settings["host"] and server_settings["port"]:
# colon(:) is legal for a host only in an ipv6 address
display_host = (
f'[{server_settings["host"]}]'
if ":" in server_settings["host"]
else server_settings["host"]
)
serve_location = (
f'{proto}://{display_host}:{server_settings["port"]}'
)
return serve_location
@staticmethod
def get_address(
host: Optional[str],
port: Optional[int],
version: HTTPVersion = HTTP.VERSION_1,
) -> Tuple[str, int]:
host = host or "127.0.0.1"
port = port or (8443 if version == 3 else 8000)
return host, port
@classmethod @classmethod
def should_auto_reload(cls) -> bool: def should_auto_reload(cls) -> bool:
return any(app.state.auto_reload for app in cls._app_registry.values()) return any(app.state.auto_reload for app in cls._app_registry.values())