Add in sanic-routing branch

This commit is contained in:
Adam Hopkins 2021-01-28 12:33:09 +02:00
commit 65b76f2762
46 changed files with 1418 additions and 2198 deletions

View File

@ -6,12 +6,16 @@ sanic.app
.. automodule:: sanic.app .. automodule:: sanic.app
:members: :members:
:show-inheritance:
:inherited-members:
sanic.blueprints sanic.blueprints
---------------- ----------------
.. automodule:: sanic.blueprints .. automodule:: sanic.blueprints
:members: :members:
:show-inheritance:
:inherited-members:
sanic.blueprint_group sanic.blueprint_group
--------------------- ---------------------
@ -105,13 +109,6 @@ sanic.static
:members: :members:
:undoc-members: :undoc-members:
sanic.testing
-------------
.. automodule:: sanic.testing
:members:
:undoc-members:
sanic.views sanic.views
----------- -----------

View File

@ -1 +1 @@
__version__ = "20.12.0" __version__ = "21.3.0a1"

View File

@ -25,15 +25,35 @@ from typing import (
) )
from urllib.parse import urlencode, urlunparse from urllib.parse import urlencode, urlunparse
from sanic_routing.route import Route
from sanic import reloader_helpers from sanic import reloader_helpers
from sanic.asgi import ASGIApp from sanic.asgi import ASGIApp
from sanic.blueprint_group import BlueprintGroup from sanic.blueprint_group import BlueprintGroup
from sanic.blueprints import Blueprint from sanic.blueprints import Blueprint
from sanic.config import BASE_LOGO, Config from sanic.config import BASE_LOGO, Config
from sanic.constants import HTTP_METHODS from sanic.constants import HTTP_METHODS
from sanic.exceptions import SanicException, ServerError, URLBuildError from sanic.exceptions import (
InvalidUsage,
NotFound,
SanicException,
ServerError,
URLBuildError,
)
from sanic.handlers import ErrorHandler, ListenerType, MiddlewareType from sanic.handlers import ErrorHandler, ListenerType, MiddlewareType
from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger
from sanic.mixins.base import BaseMixin
from sanic.mixins.exceptions import ExceptionMixin
from sanic.mixins.listeners import ListenerEvent, ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
from sanic.models.futures import (
FutureException,
FutureListener,
FutureMiddleware,
FutureRoute,
FutureStatic,
)
from sanic.request import Request from sanic.request import Request
from sanic.response import BaseHTTPResponse, HTTPResponse from sanic.response import BaseHTTPResponse, HTTPResponse
from sanic.router import Router from sanic.router import Router
@ -45,12 +65,13 @@ from sanic.server import (
serve_multiple, serve_multiple,
) )
from sanic.static import register as static_register from sanic.static import register as static_register
from sanic.testing import SanicASGITestClient, SanicTestClient
from sanic.views import CompositionView from sanic.views import CompositionView
from sanic.websocket import ConnectionClosed, WebSocketProtocol from sanic.websocket import ConnectionClosed, WebSocketProtocol
class Sanic: class Sanic(
BaseMixin, RouteMixin, MiddlewareMixin, ListenerMixin, ExceptionMixin
):
""" """
The main application instance The main application instance
""" """
@ -70,8 +91,8 @@ class Sanic:
configure_logging: bool = True, configure_logging: bool = True,
register: Optional[bool] = None, register: Optional[bool] = None,
) -> None: ) -> None:
super().__init__()
# Get name from previous stack frame
if name is None: if name is None:
raise SanicException( raise SanicException(
"Sanic instance cannot be unnamed. " "Sanic instance cannot be unnamed. "
@ -83,7 +104,9 @@ class Sanic:
self.name = name self.name = name
self.asgi = False self.asgi = False
self.router = router or Router(self) self.router = router or Router(
exception=NotFound, method_handler_exception=NotFound
)
self.request_class = request_class self.request_class = request_class
self.error_handler = error_handler or ErrorHandler() self.error_handler = error_handler or ErrorHandler()
self.config = Config(load_env=load_env) self.config = Config(load_env=load_env)
@ -102,6 +125,8 @@ class Sanic:
self.websocket_tasks: Set[Future] = set() self.websocket_tasks: Set[Future] = set()
self.named_request_middleware: Dict[str, MiddlewareType] = {} self.named_request_middleware: Dict[str, MiddlewareType] = {}
self.named_response_middleware: Dict[str, MiddlewareType] = {} self.named_response_middleware: Dict[str, MiddlewareType] = {}
self._test_client = None
self._asgi_client = None
# Register alternative method names # Register alternative method names
self.go_fast = self.run self.go_fast = self.run
@ -151,28 +176,8 @@ class Sanic:
) )
# Decorator # Decorator
def listener(self, event: str): def _apply_listener(self, listener: FutureListener):
""" return self.register_listener(listener.listener, listener.event)
Create a listener from a decorated function.
To be used as a deocrator:
.. code-block:: python
@bp.listener("before_server_start")
async def before_server_start(app, loop):
...
`See user guide <https://sanicframework.org/guide/basics/listeners.html#listeners>`__
:param event: event to listen to
"""
def decorator(listener: Callable):
self.listeners[event].append(listener)
return listener
return decorator
def register_listener(self, listener: Callable, event: str) -> Any: def register_listener(self, listener: Callable, event: str) -> Any:
""" """
@ -183,473 +188,20 @@ class Sanic:
:return: listener :return: listener
""" """
return self.listener(event)(listener) try:
_event = ListenerEvent(event)
except ValueError:
valid = ", ".join(ListenerEvent.__members__.values())
raise InvalidUsage(f"Invalid event: {event}. Use one of: {valid}")
# Decorator self.listeners[_event].append(listener)
def route( return listener
self,
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = False,
):
"""
Decorate a function to be registered as a route
:param uri: path of the URL def _apply_route(self, route: FutureRoute) -> Route:
:param methods: list or tuple of methods allowed return self.router.add(**route._asdict())
:param host: the host, if required
:param strict_slashes: whether to apply strict slashes to the route
:param stream: whether to allow the request to stream its body
:param version: route specific versioning
:param name: user defined route name for url_for
:param ignore_body: whether the handler should ignore request
body (eg. GET requests)
:return: tuple of routes, decorated function
"""
# Fix case where the user did not prefix the URL with a / def _apply_static(self, static: FutureStatic) -> Route:
# and will probably get confused as to why it's not working return static_register(self, static)
if not uri.startswith("/"):
uri = "/" + uri
if strict_slashes is None:
strict_slashes = self.strict_slashes
def response(handler):
if isinstance(handler, tuple):
# if a handler fn is already wrapped in a route, the handler
# variable will be a tuple of (existing routes, handler fn)
routes, handler = handler
else:
routes = []
args = list(signature(handler).parameters.keys())
if not args:
handler_name = handler.__name__
raise ValueError(
f"Required parameter `request` missing "
f"in the {handler_name}() route?"
)
if stream:
handler.is_stream = stream
routes.extend(
self.router.add(
uri=uri,
methods=methods,
handler=handler,
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
)
return routes, handler
return response
# Shorthand method decorators
def get(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **GET** *HTTP* method
:param uri: URL to be tagged to **GET** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"GET"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def post(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **POST** *HTTP* method
:param uri: URL to be tagged to **POST** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"POST"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def put(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **PUT** *HTTP* method
:param uri: URL to be tagged to **PUT** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PUT"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def head(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **HEAD** *HTTP* method
:param uri: URL to be tagged to **HEAD** method of *HTTP*
:type uri: str
:param host: Host IP or FQDN for the service to use
:type host: Optional[str], optional
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:type strict_slashes: Optional[bool], optional
:param version: API Version
:type version: Optional[str], optional
:param name: Unique name that can be used to identify the Route
:type name: Optional[str], optional
:param ignore_body: whether the handler should ignore request
body (eg. GET requests), defaults to True
:type ignore_body: bool, optional
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"HEAD"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def options(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **OPTIONS** *HTTP* method
:param uri: URL to be tagged to **OPTIONS** method of *HTTP*
:type uri: str
:param host: Host IP or FQDN for the service to use
:type host: Optional[str], optional
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:type strict_slashes: Optional[bool], optional
:param version: API Version
:type version: Optional[str], optional
:param name: Unique name that can be used to identify the Route
:type name: Optional[str], optional
:param ignore_body: whether the handler should ignore request
body (eg. GET requests), defaults to True
:type ignore_body: bool, optional
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"OPTIONS"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def patch(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream=False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **PATCH** *HTTP* method
:param uri: URL to be tagged to **PATCH** method of *HTTP*
:type uri: str
:param host: Host IP or FQDN for the service to use
:type host: Optional[str], optional
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:type strict_slashes: Optional[bool], optional
:param stream: whether to allow the request to stream its body
:type stream: Optional[bool], optional
:param version: API Version
:type version: Optional[str], optional
:param name: Unique name that can be used to identify the Route
:type name: Optional[str], optional
:param ignore_body: whether the handler should ignore request
body (eg. GET requests), defaults to True
:type ignore_body: bool, optional
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PATCH"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def delete(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **DELETE** *HTTP* method
:param uri: URL to be tagged to **DELETE** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"DELETE"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def add_route(
self,
handler,
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
stream: bool = False,
):
"""A helper method to register class instance or
functions as a handler to the application url
routes.
:param handler: function or class instance
:param uri: path of the URL
:param methods: list or tuple of methods allowed, these are overridden
if using a HTTPMethodView
:param host:
:param strict_slashes:
:param version:
:param name: user defined route name for url_for
:param stream: boolean specifying if the handler is a stream handler
:return: function or class instance
"""
# Handle HTTPMethodView differently
if hasattr(handler, "view_class"):
methods = set()
for method in HTTP_METHODS:
_handler = getattr(handler.view_class, method.lower(), None)
if _handler:
methods.add(method)
if hasattr(_handler, "is_stream"):
stream = True
# handle composition view differently
if isinstance(handler, CompositionView):
methods = handler.handlers.keys()
for _handler in handler.handlers.values():
if hasattr(_handler, "is_stream"):
stream = True
break
if strict_slashes is None:
strict_slashes = self.strict_slashes
self.route(
uri=uri,
methods=methods,
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)(handler)
return handler
def websocket(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
subprotocols=None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Decorate a function to be registered as a websocket route
:param uri: path of the URL
:param host: Host IP or FQDN details
:param strict_slashes: If the API endpoint needs to terminate
with a "/" or not
:param subprotocols: optional list of str with supported subprotocols
:param name: A unique name assigned to the URL so that it can
be used with :func:`url_for`
:return: tuple of routes, decorated function
"""
self.enable_websocket()
# Fix case where the user did not prefix the URL with a /
# and will probably get confused as to why it's not working
if not uri.startswith("/"):
uri = "/" + uri
if strict_slashes is None:
strict_slashes = self.strict_slashes
def response(handler):
if isinstance(handler, tuple):
# if a handler fn is already wrapped in a route, the handler
# variable will be a tuple of (existing routes, handler fn)
routes, handler = handler
else:
routes = []
websocket_handler = partial(
self._websocket_handler, handler, subprotocols=subprotocols
)
websocket_handler.__name__ = (
"websocket_handler_" + handler.__name__
)
websocket_handler.is_websocket = True
routes.extend(
self.router.add(
uri=uri,
handler=websocket_handler,
methods=frozenset({"GET"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
)
return routes, handler
return response
def add_websocket_route(
self,
handler,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
subprotocols=None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
A helper method to register a function as a websocket route.
:param handler: a callable function or instance of a class
that can handle the websocket request
:param host: Host IP or FQDN details
:param uri: URL path that will be mapped to the websocket
handler
handler
:param strict_slashes: If the API endpoint needs to terminate
with a "/" or not
:param subprotocols: Subprotocols to be used with websocket
handshake
:param name: A unique name assigned to the URL so that it can
be used with :func:`url_for`
:return: Objected decorated by :func:`websocket`
"""
if strict_slashes is None:
strict_slashes = self.strict_slashes
return self.websocket(
uri,
host=host,
strict_slashes=strict_slashes,
subprotocols=subprotocols,
version=version,
name=name,
)(handler)
def enable_websocket(self, enable: bool = True): def enable_websocket(self, enable: bool = True):
""" """
@ -665,25 +217,22 @@ class Sanic:
self.websocket_enabled = enable self.websocket_enabled = enable
def exception(self, *exceptions): # Decorator
""" def _apply_exception_handler(self, handler: FutureException):
Decorate a function to be registered as a handler for exceptions """Decorate a function to be registered as a handler for exceptions
:param exceptions: exceptions :param exceptions: exceptions
:return: decorated function :return: decorated function
""" """
def response(handler): for exception in handler.exceptions:
for exception in exceptions:
if isinstance(exception, (tuple, list)): if isinstance(exception, (tuple, list)):
for e in exception: for e in exception:
self.error_handler.add(e, handler) self.error_handler.add(e, handler.handler)
else: else:
self.error_handler.add(exception, handler) self.error_handler.add(exception, handler.handler)
return handler return handler
return response
def register_middleware(self, middleware, attach_to: str = "request"): def register_middleware(self, middleware, attach_to: str = "request"):
""" """
Register an application level middleware that will be attached Register an application level middleware that will be attached
@ -738,77 +287,20 @@ class Sanic:
if middleware not in self.named_response_middleware[_rn]: if middleware not in self.named_response_middleware[_rn]:
self.named_response_middleware[_rn].appendleft(middleware) self.named_response_middleware[_rn].appendleft(middleware)
def middleware(self, middleware_or_request): # Decorator
""" def _apply_middleware(
Decorate and register middleware to be called before a request.
Can either be called as *@app.middleware* or
*@app.middleware('request')*
`See user guide <https://sanicframework.org/guide/basics/middleware.html>`__
:param: middleware_or_request: Optional parameter to use for
identifying which type of middleware is being registered.
"""
# Detect which way this was called, @middleware or @middleware('AT')
if callable(middleware_or_request):
return self.register_middleware(middleware_or_request)
else:
return partial(
self.register_middleware, attach_to=middleware_or_request
)
def static(
self, self,
uri: str, middleware: FutureMiddleware,
file_or_directory: str, route_names: Optional[List[str]] = None,
pattern=r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: bool = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: str = None,
): ):
""" print(f"{middleware=}")
Register a root to serve files from. The input can either be a if route_names:
file or a directory. This method will enable an easy and simple way return self.register_named_middleware(
to setup the :class:`Route` necessary to serve the static files. middleware.middleware, route_names, middleware.attach_to
)
:param uri: URL path to be used for serving static content else:
:param file_or_directory: Path for the Static file/directory with return self.register_middleware(
static files middleware.middleware, middleware.attach_to
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
return static_register(
self,
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
) )
def blueprint(self, blueprint, **options): def blueprint(self, blueprint, **options):
@ -1125,7 +617,12 @@ class Sanic:
@property @property
def test_client(self): def test_client(self):
return SanicTestClient(self) if self._test_client:
return self._test_client
from sanic_testing.testing import SanicTestClient # type: ignore
self._test_client = SanicTestClient(self)
return self._test_client
@property @property
def asgi_client(self): def asgi_client(self):
@ -1136,7 +633,12 @@ class Sanic:
:return: testing client :return: testing client
:rtype: :class:`SanicASGITestClient` :rtype: :class:`SanicASGITestClient`
""" """
return SanicASGITestClient(self) if self._asgi_client:
return self._asgi_client
from sanic_testing.testing import SanicASGITestClient # type: ignore
self._asgi_client = SanicASGITestClient(self)
return self._asgi_client
# -------------------------------------------------------------------- # # -------------------------------------------------------------------- #
# Execution # Execution
@ -1414,6 +916,9 @@ class Sanic:
auto_reload=False, auto_reload=False,
): ):
"""Helper function used by `run` and `create_server`.""" """Helper function used by `run` and `create_server`."""
self.router.finalize()
if isinstance(ssl, dict): if isinstance(ssl, dict):
# try common aliaseses # try common aliaseses
cert = ssl.get("cert") or ssl.get("certificate") cert = ssl.get("cert") or ssl.get("certificate")

View File

@ -144,10 +144,13 @@ class BlueprintGroup(MutableSequence):
:param kwargs: Optional Keyword arg to use with Middleware :param kwargs: Optional Keyword arg to use with Middleware
:return: Partial function to apply the middleware :return: Partial function to apply the middleware
""" """
kwargs["bp_group"] = True
def register_middleware_for_blueprints(fn): def register_middleware_for_blueprints(fn):
for blueprint in self.blueprints: for blueprint in self.blueprints:
blueprint.middleware(fn, *args, **kwargs) blueprint.middleware(fn, *args, **kwargs)
if args and callable(args[0]):
fn = args[0]
args = list(args)[1:]
return register_middleware_for_blueprints(fn)
return register_middleware_for_blueprints return register_middleware_for_blueprints

View File

@ -2,11 +2,17 @@ from collections import defaultdict, namedtuple
from typing import Iterable, Optional from typing import Iterable, Optional
from sanic.blueprint_group import BlueprintGroup from sanic.blueprint_group import BlueprintGroup
from sanic.constants import HTTP_METHODS from sanic.mixins.base import BaseMixin
from sanic.views import CompositionView from sanic.mixins.exceptions import ExceptionMixin
from sanic.mixins.listeners import ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
from sanic.models.futures import FutureRoute, FutureStatic
class Blueprint: class Blueprint(
BaseMixin, RouteMixin, MiddlewareMixin, ListenerMixin, ExceptionMixin
):
""" """
In *Sanic* terminology, a **Blueprint** is a logical collection of In *Sanic* terminology, a **Blueprint** is a logical collection of
URLs that perform a specific set of tasks which can be identified by URLs that perform a specific set of tasks which can be identified by
@ -46,6 +52,26 @@ class Blueprint:
self.version = version self.version = version
self.strict_slashes = strict_slashes self.strict_slashes = strict_slashes
def route(self, *args, **kwargs):
kwargs["apply"] = False
return super().route(*args, **kwargs)
def static(self, *args, **kwargs):
kwargs["apply"] = False
return super().static(*args, **kwargs)
def middleware(self, *args, **kwargs):
kwargs["apply"] = False
return super().middleware(*args, **kwargs)
def listener(self, *args, **kwargs):
kwargs["apply"] = False
return super().listener(*args, **kwargs)
def exception(self, *args, **kwargs):
kwargs["apply"] = False
return super().exception(*args, **kwargs)
@staticmethod @staticmethod
def group(*blueprints, url_prefix=""): def group(*blueprints, url_prefix=""):
""" """
@ -89,532 +115,49 @@ class Blueprint:
routes = [] routes = []
# Routes # Routes
for future in self.routes: for future in self._future_routes:
# attach the blueprint name to the handler so that it can be # attach the blueprint name to the handler so that it can be
# prefixed properly in the router # prefixed properly in the router
future.handler.__blueprintname__ = self.name future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available # Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri uri = url_prefix + future.uri if url_prefix else future.uri
version = future.version or self.version apply_route = FutureRoute(
future.handler,
uri[1:] if uri.startswith("//") else uri,
future.methods,
future.host or self.host,
future.strict_slashes,
future.stream,
future.version or self.version,
future.name,
future.ignore_body,
)
_routes, _ = app.route( route = app._apply_route(apply_route)
uri=uri[1:] if uri.startswith("//") else uri, routes.append(route)
methods=future.methods,
host=future.host or self.host,
strict_slashes=future.strict_slashes,
stream=future.stream,
version=version,
name=future.name,
)(future.handler)
if _routes:
routes += _routes
for future in self.websocket_routes:
# attach the blueprint name to the handler so that it can be
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri
_routes, _ = app.websocket(
uri=uri,
host=future.host or self.host,
strict_slashes=future.strict_slashes,
name=future.name,
)(future.handler)
if _routes:
routes += _routes
# Static Files # Static Files
for future in self.statics: for future in self._future_statics:
# Prepend the blueprint URI prefix if available # Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri uri = url_prefix + future.uri if url_prefix else future.uri
_routes = app.static( apply_route = FutureStatic(uri, *future[1:])
uri, future.file_or_directory, *future.args, **future.kwargs route = app._apply_static(apply_route)
) routes.append(route)
if _routes:
routes += _routes
route_names = [route.name for route in routes if route] route_names = [route.name for route in routes if route]
# Middleware # Middleware
for future in self.middlewares: for future in self._future_middleware:
if future.args or future.kwargs: app._apply_middleware(future, route_names)
app.register_named_middleware(
future.middleware,
route_names,
*future.args,
**future.kwargs,
)
else:
app.register_named_middleware(future.middleware, route_names)
# Exceptions # Exceptions
for future in self.exceptions: for future in self._future_exceptions:
app.exception(*future.args, **future.kwargs)(future.handler) app._apply_exception_handler(future)
# Event listeners # Event listeners
for event, listeners in self.listeners.items(): for listener in self._future_listeners:
for listener in listeners: app._apply_listener(listener)
app.listener(event)(listener)
def route( def _generate_name(self, handler, name: str) -> str:
self, return f"{self.name}.{name or handler.__name__}"
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Create a blueprint route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param stream: If the route should provide a streaming support
:param version: Blueprint Version
:param name: Unique name to identify the Route
:return a decorated method that when invoked will return an object
of type :class:`FutureRoute`
"""
if strict_slashes is None:
strict_slashes = self.strict_slashes
def decorator(handler):
route = FutureRoute(
handler,
uri,
methods,
host,
strict_slashes,
stream,
version,
name,
)
self.routes.append(route)
return handler
return decorator
def add_route(
self,
handler,
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
stream: bool = False,
):
"""
Create a blueprint route from a function.
:param handler: function for handling uri requests. Accepts function,
or class instance with a view_class method.
:param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param version: Blueprint Version
:param name: user defined route name for url_for
:param stream: boolean specifying if the handler is a stream handler
:return: function or class instance
"""
# Handle HTTPMethodView differently
if hasattr(handler, "view_class"):
methods = set()
for method in HTTP_METHODS:
if getattr(handler.view_class, method.lower(), None):
methods.add(method)
if strict_slashes is None:
strict_slashes = self.strict_slashes
# handle composition view differently
if isinstance(handler, CompositionView):
methods = handler.handlers.keys()
self.route(
uri=uri,
methods=methods,
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)(handler)
return handler
def websocket(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Create a blueprint websocket route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param version: Blueprint Version
:param name: Unique name to identify the Websocket Route
"""
if strict_slashes is None:
strict_slashes = self.strict_slashes
def decorator(handler):
nonlocal uri
nonlocal host
nonlocal strict_slashes
nonlocal version
nonlocal name
name = f"{self.name}.{name or handler.__name__}"
route = FutureRoute(
handler, uri, [], host, strict_slashes, False, version, name
)
self.websocket_routes.append(route)
return handler
return decorator
def add_websocket_route(
self,
handler,
uri: str,
host: Optional[str] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""Create a blueprint websocket route from a function.
:param handler: function for handling uri requests. Accepts function,
or class instance with a view_class method.
:param uri: endpoint at which the route will be accessible.
:param host: IP Address of FQDN for the sanic server to use.
:param version: Blueprint Version
:param name: Unique name to identify the Websocket Route
:return: function or class instance
"""
self.websocket(uri=uri, host=host, version=version, name=name)(handler)
return handler
def listener(self, event):
"""Create a listener from a decorated function.
:param event: Event to listen to.
"""
def decorator(listener):
self.listeners[event].append(listener)
return listener
return decorator
def middleware(self, *args, **kwargs):
"""
Create a blueprint middleware from a decorated function.
:param args: Positional arguments to be used while invoking the
middleware
:param kwargs: optional keyword args that can be used with the
middleware.
"""
def register_middleware(_middleware):
future_middleware = FutureMiddleware(_middleware, args, kwargs)
self.middlewares.append(future_middleware)
return _middleware
# Detect which way this was called, @middleware or @middleware('AT')
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
middleware = args[0]
args = []
return register_middleware(middleware)
else:
if kwargs.get("bp_group") and callable(args[0]):
middleware = args[0]
args = args[1:]
kwargs.pop("bp_group")
return register_middleware(middleware)
else:
return register_middleware
def exception(self, *args, **kwargs):
"""
This method enables the process of creating a global exception
handler for the current blueprint under question.
:param args: List of Python exceptions to be caught by the handler
:param kwargs: Additional optional arguments to be passed to the
exception handler
:return a decorated method to handle global exceptions for any
route registered under this blueprint.
"""
def decorator(handler):
exception = FutureException(handler, args, kwargs)
self.exceptions.append(exception)
return handler
return decorator
def static(self, uri: str, file_or_directory: str, *args, **kwargs):
"""Create a blueprint static route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param file_or_directory: Static asset.
"""
name = kwargs.pop("name", "static")
if not name.startswith(self.name + "."):
name = f"{self.name}.{name}"
kwargs.update(name=name)
strict_slashes = kwargs.get("strict_slashes")
if strict_slashes is None and self.strict_slashes is not None:
kwargs.update(strict_slashes=self.strict_slashes)
static = FutureStatic(uri, file_or_directory, args, kwargs)
self.statics.append(static)
# Shorthand method decorators
def get(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **GET** *HTTP* method
:param uri: URL to be tagged to **GET** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"GET"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
def post(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **POST** *HTTP* method
:param uri: URL to be tagged to **POST** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"POST"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def put(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **PUT** *HTTP* method
:param uri: URL to be tagged to **PUT** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PUT"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def head(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **HEAD** *HTTP* method
:param uri: URL to be tagged to **HEAD** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"HEAD"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
def options(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **OPTIONS** *HTTP* method
:param uri: URL to be tagged to **OPTIONS** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"OPTIONS"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
def patch(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream=False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **PATCH** *HTTP* method
:param uri: URL to be tagged to **PATCH** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PATCH"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def delete(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **DELETE** *HTTP* method
:param uri: URL to be tagged to **DELETE** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"DELETE"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
FutureRoute = namedtuple(
"FutureRoute",
[
"handler",
"uri",
"methods",
"host",
"strict_slashes",
"stream",
"version",
"name",
],
)
FutureListener = namedtuple(
"FutureListener", ["handler", "uri", "methods", "host"]
)
FutureMiddleware = namedtuple(
"FutureMiddleware", ["middleware", "args", "kwargs"]
)
FutureException = namedtuple("FutureException", ["handler", "args", "kwargs"])
FutureStatic = namedtuple(
"FutureStatic", ["uri", "file_or_directory", "args", "kwargs"]
)

View File

@ -1,14 +1,8 @@
from inspect import isclass
from os import environ from os import environ
from pathlib import Path
from typing import Any, Union from typing import Any, Union
# NOTE(tomaszdrozdz): remove in version: 21.3
# We replace from_envvar(), from_object(), from_pyfile() config object methods
# with one simpler update_config() method.
# We also replace "loading module from file code" in from_pyfile()
# in a favour of load_module_from_file_location().
# Please see pull request: 1903
# and issue: 1895
from .deprecated import from_envvar, from_object, from_pyfile # noqa
from .utils import load_module_from_file_location, str_to_bool from .utils import load_module_from_file_location, str_to_bool
@ -40,6 +34,7 @@ DEFAULT_CONFIG = {
"REAL_IP_HEADER": None, "REAL_IP_HEADER": None,
"PROXIES_COUNT": None, "PROXIES_COUNT": None,
"FORWARDED_FOR_HEADER": "X-Forwarded-For", "FORWARDED_FOR_HEADER": "X-Forwarded-For",
"REQUEST_ID_HEADER": "X-Request-ID",
"FALLBACK_ERROR_FORMAT": "html", "FALLBACK_ERROR_FORMAT": "html",
"REGISTER": True, "REGISTER": True,
} }
@ -68,17 +63,6 @@ class Config(dict):
def __setattr__(self, attr, value): def __setattr__(self, attr, value):
self[attr] = value self[attr] = value
# NOTE(tomaszdrozdz): remove in version: 21.3
# We replace from_envvar(), from_object(), from_pyfile() config object
# methods with one simpler update_config() method.
# We also replace "loading module from file code" in from_pyfile()
# in a favour of load_module_from_file_location().
# Please see pull request: 1903
# and issue: 1895
from_envvar = from_envvar
from_pyfile = from_pyfile
from_object = from_object
def load_environment_vars(self, prefix=SANIC_PREFIX): def load_environment_vars(self, prefix=SANIC_PREFIX):
""" """
Looks for prefixed environment variables and applies Looks for prefixed environment variables and applies
@ -99,19 +83,22 @@ class Config(dict):
self[config_key] = v self[config_key] = v
def update_config(self, config: Union[bytes, str, dict, Any]): def update_config(self, config: Union[bytes, str, dict, Any]):
"""Update app.config. """
Update app.config.
..note:: only upper case settings are considered. ..note:: only upper case settings are considered.
You can upload app config by providing path to py file You can upload app config by providing path to py file
holding settings. holding settings.
..code-block:: .. code-block:: python
# /some/py/file # /some/py/file
A = 1 A = 1
B = 2 B = 2
.. code-block:: python
config.update_config("${some}/py/file") config.update_config("${some}/py/file")
Yes you can put environment variable here, but they must be provided Yes you can put environment variable here, but they must be provided
@ -120,7 +107,7 @@ class Config(dict):
You can upload app config by providing dict holding settings. You can upload app config by providing dict holding settings.
..code-block:: .. code-block:: python
d = {"A": 1, "B": 2} d = {"A": 1, "B": 2}
config.update_config(d) config.update_config(d)
@ -128,19 +115,33 @@ class Config(dict):
You can upload app config by providing any object holding settings, You can upload app config by providing any object holding settings,
but in such case config.__dict__ will be used as dict holding settings. but in such case config.__dict__ will be used as dict holding settings.
..code-block:: .. code-block:: python
class C: class C:
A = 1 A = 1
B = 2 B = 2
config.update_config(C)"""
if isinstance(config, (bytes, str)): config.update_config(C)
"""
if isinstance(config, (bytes, str, Path)):
config = load_module_from_file_location(location=config) config = load_module_from_file_location(location=config)
if not isinstance(config, dict): if not isinstance(config, dict):
config = config.__dict__ cfg = {}
if not isclass(config):
cfg.update(
{
key: getattr(config, key)
for key in config.__class__.__dict__.keys()
}
)
config = dict(config.__dict__)
config.update(cfg)
config = dict(filter(lambda i: i[0].isupper(), config.items())) config = dict(filter(lambda i: i[0].isupper(), config.items()))
self.update(config) self.update(config)
load = update_config

View File

@ -109,7 +109,7 @@ class Cookie(dict):
if value is not False: if value is not False:
if key.lower() == "max-age": if key.lower() == "max-age":
if not str(value).isdigit(): if not str(value).isdigit():
value = DEFAULT_MAX_AGE raise ValueError("Cookie max-age must be an integer")
elif key.lower() == "expires": elif key.lower() == "expires":
if not isinstance(value, datetime): if not isinstance(value, datetime):
raise TypeError( raise TypeError(

View File

@ -1,106 +0,0 @@
# NOTE(tomaszdrozdz): remove in version: 21.3
# We replace from_envvar(), from_object(), from_pyfile() config object methods
# with one simpler update_config() method.
# We also replace "loading module from file code" in from_pyfile()
# in a favour of load_module_from_file_location().
# Please see pull request: 1903
# and issue: 1895
import types
from os import environ
from typing import Any
from warnings import warn
from sanic.exceptions import PyFileError
from sanic.helpers import import_string
def from_envvar(self, variable_name: str) -> bool:
"""Load a configuration from an environment variable pointing to
a configuration file.
:param variable_name: name of the environment variable
:return: bool. ``True`` if able to load config, ``False`` otherwise.
"""
warn(
"Using `from_envvar` method is deprecated and will be removed in "
"v21.3, use `app.update_config` method instead.",
DeprecationWarning,
stacklevel=2,
)
config_file = environ.get(variable_name)
if not config_file:
raise RuntimeError(
f"The environment variable {variable_name} is not set and "
f"thus configuration could not be loaded."
)
return self.from_pyfile(config_file)
def from_pyfile(self, filename: str) -> bool:
"""Update the values in the config from a Python file.
Only the uppercase variables in that module are stored in the config.
:param filename: an absolute path to the config file
"""
warn(
"Using `from_pyfile` method is deprecated and will be removed in "
"v21.3, use `app.update_config` method instead.",
DeprecationWarning,
stacklevel=2,
)
module = types.ModuleType("config")
module.__file__ = filename
try:
with open(filename) as config_file:
exec( # nosec
compile(config_file.read(), filename, "exec"),
module.__dict__,
)
except IOError as e:
e.strerror = "Unable to load configuration file (e.strerror)"
raise
except Exception as e:
raise PyFileError(filename) from e
self.from_object(module)
return True
def from_object(self, obj: Any) -> None:
"""Update the values from the given object.
Objects are usually either modules or classes.
Just the uppercase variables in that object are stored in the config.
Example usage::
from yourapplication import default_config
app.config.from_object(default_config)
or also:
app.config.from_object('myproject.config.MyConfigClass')
You should not use this function to load the actual configuration but
rather configuration defaults. The actual config should be loaded
with :meth:`from_pyfile` and ideally from a location not within the
package because the package might be installed system wide.
:param obj: an object holding the configuration
"""
warn(
"Using `from_object` method is deprecated and will be removed in "
"v21.3, use `app.update_config` method instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(obj, str):
obj = import_string(obj)
for key in dir(obj):
if key.isupper():
self[key] = getattr(obj, key)

0
sanic/mixins/__init__.py Normal file
View File

19
sanic/mixins/base.py Normal file
View File

@ -0,0 +1,19 @@
class Base(type):
def __new__(cls, name, bases, attrs):
init = attrs.get("__init__")
def __init__(self, *args, **kwargs):
nonlocal init
for base in type(self).__bases__:
if base.__name__ != "BaseMixin":
base.__init__(self, *args, **kwargs)
if init:
init(self, *args, **kwargs)
attrs["__init__"] = __init__
return type.__new__(cls, name, bases, attrs)
class BaseMixin(metaclass=Base):
...

View File

@ -0,0 +1,38 @@
from enum import Enum, auto
from functools import partial
from typing import Set
from sanic.models.futures import FutureException
class ExceptionMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_exceptions: Set[FutureException] = set()
def _apply_exception_handler(self, handler: FutureException):
raise NotImplementedError
def exception(self, *exceptions, apply=True):
"""
This method enables the process of creating a global exception
handler for the current blueprint under question.
:param args: List of Python exceptions to be caught by the handler
:param kwargs: Additional optional arguments to be passed to the
exception handler
:return a decorated method to handle global exceptions for any
route registered under this blueprint.
"""
def decorator(handler):
nonlocal apply
nonlocal exceptions
future_exception = FutureException(handler, exceptions)
self._future_exceptions.add(future_exception)
if apply:
self._apply_exception_handler(future_exception)
return handler
return decorator

71
sanic/mixins/listeners.py Normal file
View File

@ -0,0 +1,71 @@
from enum import Enum, auto
from functools import partial
from typing import Any, Callable, Coroutine, Optional, Set, Union
from sanic.models.futures import FutureListener
class ListenerEvent(str, Enum):
def _generate_next_value_(name: str, *args) -> str: # type: ignore
return name.lower()
BEFORE_SERVER_START = auto()
AFTER_SERVER_START = auto()
BEFORE_SERVER_STOP = auto()
AFTER_SERVER_STOP = auto()
class ListenerMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_listeners: Set[FutureListener] = set()
def _apply_listener(self, listener: FutureListener):
raise NotImplementedError
def listener(
self,
listener_or_event: Union[Callable[..., Coroutine[Any, Any, None]]],
event_or_none: Optional[str] = None,
apply: bool = True,
):
"""
Create a listener from a decorated function.
To be used as a deocrator:
.. code-block:: python
@bp.listener("before_server_start")
async def before_server_start(app, loop):
...
`See user guide <https://sanicframework.org/guide/basics/listeners.html#listeners>`__
:param event: event to listen to
"""
def register_listener(listener, event):
nonlocal apply
future_listener = FutureListener(listener, event)
self._future_listeners.add(future_listener)
if apply:
self._apply_listener(future_listener)
return listener
if callable(listener_or_event):
return register_listener(listener_or_event, event_or_none)
else:
return partial(register_listener, event=listener_or_event)
def before_server_start(self, listener):
return self.listener(listener, "before_server_start")
def after_server_start(self, listener):
return self.listener(listener, "after_server_start")
def before_server_stop(self, listener):
return self.listener(listener, "before_server_stop")
def after_server_stop(self, listener):
return self.listener(listener, "after_server_stop")

View File

@ -0,0 +1,51 @@
from functools import partial
from typing import Set
from sanic.models.futures import FutureMiddleware
class MiddlewareMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_middleware: Set[FutureMiddleware] = set()
def _apply_middleware(self, middleware: FutureMiddleware):
raise NotImplementedError
def middleware(
self, middleware_or_request, attach_to="request", apply=True
):
"""
Decorate and register middleware to be called before a request.
Can either be called as *@app.middleware* or
*@app.middleware('request')*
`See user guide <https://sanicframework.org/guide/basics/middleware.html>`__
:param: middleware_or_request: Optional parameter to use for
identifying which type of middleware is being registered.
"""
def register_middleware(middleware, attach_to="request"):
nonlocal apply
future_middleware = FutureMiddleware(middleware, attach_to)
self._future_middleware.add(future_middleware)
if apply:
self._apply_middleware(future_middleware)
return middleware
# Detect which way this was called, @middleware or @middleware('AT')
if callable(middleware_or_request):
return register_middleware(
middleware_or_request, attach_to=attach_to
)
else:
return partial(
register_middleware, attach_to=middleware_or_request
)
def on_request(self, middleware):
return self.middleware(middleware, "request")
def on_response(self, middleware):
return self.middleware(middleware, "response")

570
sanic/mixins/routes.py Normal file
View File

@ -0,0 +1,570 @@
from functools import partial
from inspect import signature
from pathlib import PurePath
from typing import Iterable, List, Optional, Set, Union
from sanic_routing.route import Route
from sanic.constants import HTTP_METHODS
from sanic.models.futures import FutureRoute, FutureStatic
from sanic.views import CompositionView
class RouteMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_routes: Set[FutureRoute] = set()
self._future_statics: Set[FutureStatic] = set()
self.name = ""
self.strict_slashes = False
def _apply_route(self, route: FutureRoute) -> Route:
raise NotImplementedError
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError
def route(
self,
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = False,
apply: bool = True,
subprotocols: Optional[List[str]] = None,
websocket: bool = False,
):
"""
Decorate a function to be registered as a route
:param uri: path of the URL
:param methods: list or tuple of methods allowed
:param host: the host, if required
:param strict_slashes: whether to apply strict slashes to the route
:param stream: whether to allow the request to stream its body
:param version: route specific versioning
:param name: user defined route name for url_for
:param ignore_body: whether the handler should ignore request
body (eg. GET requests)
:return: tuple of routes, decorated function
"""
if websocket:
self.enable_websocket()
# Fix case where the user did not prefix the URL with a /
# and will probably get confused as to why it's not working
if not uri.startswith("/"):
uri = "/" + uri
if strict_slashes is None:
strict_slashes = self.strict_slashes
def decorator(handler):
nonlocal uri
nonlocal methods
nonlocal host
nonlocal strict_slashes
nonlocal stream
nonlocal version
nonlocal name
nonlocal ignore_body
nonlocal subprotocols
nonlocal websocket
if isinstance(handler, tuple):
# if a handler fn is already wrapped in a route, the handler
# variable will be a tuple of (existing routes, handler fn)
_, handler = handler
if websocket:
websocket_handler = partial(
self._websocket_handler,
handler,
subprotocols=subprotocols,
)
websocket_handler.__name__ = (
"websocket_handler_" + handler.__name__
)
websocket_handler.is_websocket = True
handler = websocket_handler
# TODO:
# - THink this thru.... do we want all routes namespaced?
# -
name = self._generate_name(handler, name)
route = FutureRoute(
handler,
uri,
methods,
host,
strict_slashes,
stream,
version,
name,
ignore_body,
)
self._future_routes.add(route)
args = list(signature(handler).parameters.keys())
if websocket and len(args) < 2:
handler_name = handler.__name__
raise ValueError(
f"Required parameter `request` and/or `ws` missing "
f"in the {handler_name}() route?"
)
elif not args:
handler_name = handler.__name__
raise ValueError(
f"Required parameter `request` missing "
f"in the {handler_name}() route?"
)
if not websocket and stream:
handler.is_stream = stream
if apply:
self._apply_route(route)
return route, handler
return decorator
def add_route(
self,
handler,
uri: str,
methods: Iterable[str] = frozenset({"GET"}),
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
stream: bool = False,
):
"""A helper method to register class instance or
functions as a handler to the application url
routes.
:param handler: function or class instance
:param uri: path of the URL
:param methods: list or tuple of methods allowed, these are overridden
if using a HTTPMethodView
:param host:
:param strict_slashes:
:param version:
:param name: user defined route name for url_for
:param stream: boolean specifying if the handler is a stream handler
:return: function or class instance
"""
# Handle HTTPMethodView differently
if hasattr(handler, "view_class"):
methods = set()
for method in HTTP_METHODS:
_handler = getattr(handler.view_class, method.lower(), None)
if _handler:
methods.add(method)
if hasattr(_handler, "is_stream"):
stream = True
# handle composition view differently
if isinstance(handler, CompositionView):
methods = handler.handlers.keys()
for _handler in handler.handlers.values():
if hasattr(_handler, "is_stream"):
stream = True
break
if strict_slashes is None:
strict_slashes = self.strict_slashes
self.route(
uri=uri,
methods=methods,
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)(handler)
return handler
# Shorthand method decorators
def get(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **GET** *HTTP* method
:param uri: URL to be tagged to **GET** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"GET"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def post(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **POST** *HTTP* method
:param uri: URL to be tagged to **POST** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"POST"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def put(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream: bool = False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **PUT** *HTTP* method
:param uri: URL to be tagged to **PUT** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PUT"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def head(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **HEAD** *HTTP* method
:param uri: URL to be tagged to **HEAD** method of *HTTP*
:type uri: str
:param host: Host IP or FQDN for the service to use
:type host: Optional[str], optional
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:type strict_slashes: Optional[bool], optional
:param version: API Version
:type version: Optional[str], optional
:param name: Unique name that can be used to identify the Route
:type name: Optional[str], optional
:param ignore_body: whether the handler should ignore request
body (eg. GET requests), defaults to True
:type ignore_body: bool, optional
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"HEAD"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def options(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **OPTIONS** *HTTP* method
:param uri: URL to be tagged to **OPTIONS** method of *HTTP*
:type uri: str
:param host: Host IP or FQDN for the service to use
:type host: Optional[str], optional
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:type strict_slashes: Optional[bool], optional
:param version: API Version
:type version: Optional[str], optional
:param name: Unique name that can be used to identify the Route
:type name: Optional[str], optional
:param ignore_body: whether the handler should ignore request
body (eg. GET requests), defaults to True
:type ignore_body: bool, optional
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"OPTIONS"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def patch(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
stream=False,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Add an API URL under the **PATCH** *HTTP* method
:param uri: URL to be tagged to **PATCH** method of *HTTP*
:type uri: str
:param host: Host IP or FQDN for the service to use
:type host: Optional[str], optional
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:type strict_slashes: Optional[bool], optional
:param stream: whether to allow the request to stream its body
:type stream: Optional[bool], optional
:param version: API Version
:type version: Optional[str], optional
:param name: Unique name that can be used to identify the Route
:type name: Optional[str], optional
:param ignore_body: whether the handler should ignore request
body (eg. GET requests), defaults to True
:type ignore_body: bool, optional
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PATCH"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def delete(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
version: Optional[int] = None,
name: Optional[str] = None,
ignore_body: bool = True,
):
"""
Add an API URL under the **DELETE** *HTTP* method
:param uri: URL to be tagged to **DELETE** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"DELETE"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def websocket(
self,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
subprotocols: Optional[List[str]] = None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
Decorate a function to be registered as a websocket route
:param uri: path of the URL
:param host: Host IP or FQDN details
:param strict_slashes: If the API endpoint needs to terminate
with a "/" or not
:param subprotocols: optional list of str with supported subprotocols
:param name: A unique name assigned to the URL so that it can
be used with :func:`url_for`
:return: tuple of routes, decorated function
"""
return self.route(
uri=uri,
host=host,
methods=None,
strict_slashes=strict_slashes,
version=version,
name=name,
apply=apply,
subprotocols=subprotocols,
websocket=True,
)
def add_websocket_route(
self,
handler,
uri: str,
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
subprotocols=None,
version: Optional[int] = None,
name: Optional[str] = None,
):
"""
A helper method to register a function as a websocket route.
:param handler: a callable function or instance of a class
that can handle the websocket request
:param host: Host IP or FQDN details
:param uri: URL path that will be mapped to the websocket
handler
handler
:param strict_slashes: If the API endpoint needs to terminate
with a "/" or not
:param subprotocols: Subprotocols to be used with websocket
handshake
:param name: A unique name assigned to the URL so that it can
be used with :func:`url_for`
:return: Objected decorated by :func:`websocket`
"""
return self.websocket(
uri=uri,
host=host,
strict_slashes=strict_slashes,
subprotocols=subprotocols,
version=version,
name=name,
)(handler)
def static(
self,
uri,
file_or_directory: Union[str, bytes, PurePath],
pattern=r"/?.+",
use_modified_since=True,
use_content_range=False,
stream_large_files=False,
name="static",
host=None,
strict_slashes=None,
content_type=None,
apply=True,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
if not name.startswith(self.name + "."):
name = f"{self.name}.{name}"
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
def _generate_name(self, handler, name: str) -> str:
return name or handler.__name__

0
sanic/models/__init__.py Normal file
View File

35
sanic/models/futures.py Normal file
View File

@ -0,0 +1,35 @@
from collections import namedtuple
FutureRoute = namedtuple(
"FutureRoute",
[
"handler",
"uri",
"methods",
"host",
"strict_slashes",
"stream",
"version",
"name",
"ignore_body",
],
)
FutureListener = namedtuple("FutureListener", ["listener", "event"])
FutureMiddleware = namedtuple("FutureMiddleware", ["middleware", "attach_to"])
FutureException = namedtuple("FutureException", ["handler", "exceptions"])
FutureStatic = namedtuple(
"FutureStatic",
[
"uri",
"file_or_directory",
"pattern",
"use_modified_since",
"use_content_range",
"stream_large_files",
"name",
"host",
"strict_slashes",
"content_type",
],
)

View File

@ -1,4 +1,5 @@
import email.utils import email.utils
import uuid
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from http.cookies import SimpleCookie from http.cookies import SimpleCookie
@ -51,6 +52,7 @@ class Request:
__slots__ = ( __slots__ = (
"__weakref__", "__weakref__",
"_cookies", "_cookies",
"_id",
"_ip", "_ip",
"_parsed_url", "_parsed_url",
"_port", "_port",
@ -82,6 +84,7 @@ class Request:
self.raw_url = url_bytes self.raw_url = url_bytes
# TODO: Content-Encoding detection # TODO: Content-Encoding detection
self._parsed_url = parse_url(url_bytes) self._parsed_url = parse_url(url_bytes)
self._id = None
self.app = app self.app = app
self.headers = headers self.headers = headers
@ -110,6 +113,10 @@ class Request:
class_name = self.__class__.__name__ class_name = self.__class__.__name__
return f"<{class_name}: {self.method} {self.path}>" return f"<{class_name}: {self.method} {self.path}>"
@classmethod
def generate_id(*_):
return uuid.uuid4()
async def respond( async def respond(
self, response=None, *, status=200, headers=None, content_type=None self, response=None, *, status=200, headers=None, content_type=None
): ):
@ -148,6 +155,26 @@ class Request:
if not self.body: if not self.body:
self.body = b"".join([data async for data in self.stream]) self.body = b"".join([data async for data in self.stream])
@property
def id(self):
if not self._id:
self._id = self.headers.get(
self.app.config.REQUEST_ID_HEADER,
self.__class__.generate_id(self),
)
# Try casting to a UUID or an integer
if isinstance(self._id, str):
try:
self._id = uuid.UUID(self._id)
except ValueError:
try:
self._id = int(self._id)
except ValueError:
...
return self._id
@property @property
def json(self): def json(self):
if self.parsed_json is None: if self.parsed_json is None:

View File

@ -1,133 +1,36 @@
import re
import uuid
from collections import defaultdict, namedtuple
from collections.abc import Iterable
from functools import lru_cache from functools import lru_cache
from urllib.parse import unquote
from sanic.exceptions import MethodNotSupported, NotFound from sanic_routing import BaseRouter
from sanic.views import CompositionView from sanic_routing.route import Route
from sanic.constants import HTTP_METHODS
from sanic.request import Request
Route = namedtuple( class Router(BaseRouter):
"Route", DEFAULT_METHOD = "GET"
[ ALLOWED_METHODS = HTTP_METHODS
"handler",
"methods",
"pattern",
"parameters",
"name",
"uri",
"endpoint",
"ignore_body",
],
)
Parameter = namedtuple("Parameter", ["name", "cast"])
REGEX_TYPES = { @lru_cache
"string": (str, r"[^/]+"), def get(self, request: Request):
"int": (int, r"-?\d+"), route, handler, params = self.resolve(
"number": (float, r"-?(?:\d+(?:\.\d*)?|\.\d+)"), path=request.path,
"alpha": (str, r"[A-Za-z]+"), method=request.method,
"path": (str, r"[^/].*?"),
"uuid": (
uuid.UUID,
r"[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-"
r"[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}",
),
}
ROUTER_CACHE_SIZE = 1024
def url_hash(url):
return url.count("/")
class RouteExists(Exception):
pass
class RouteDoesNotExist(Exception):
pass
class ParameterNameConflicts(Exception):
pass
class Router:
"""Router supports basic routing with parameters and method checks
Usage:
.. code-block:: python
@sanic.route('/my/url/<my_param>', methods=['GET', 'POST', ...])
def my_route(request, my_param):
do stuff...
or
.. code-block:: python
@sanic.route('/my/url/<my_param:my_type>', methods['GET', 'POST', ...])
def my_route_with_type(request, my_param: my_type):
do stuff...
Parameters will be passed as keyword arguments to the request handling
function. Provided parameters can also have a type by appending :type to
the <parameter>. Given parameter must be able to be type-casted to this.
If no type is provided, a string is expected. A regular expression can
also be passed in as the type. The argument given to the function will
always be a string, independent of the type.
"""
routes_static = None
routes_dynamic = None
routes_always_check = None
parameter_pattern = re.compile(r"<(.+?)>")
def __init__(self, app):
self.app = app
self.routes_all = {}
self.routes_names = {}
self.routes_static_files = {}
self.routes_static = {}
self.routes_dynamic = defaultdict(list)
self.routes_always_check = []
self.hosts = set()
@classmethod
def parse_parameter_string(cls, parameter_string):
"""Parse a parameter string into its constituent name, type, and
pattern
For example::
parse_parameter_string('<param_one:[A-z]>')` ->
('param_one', str, '[A-z]')
:param parameter_string: String to parse
:return: tuple containing
(parameter_name, parameter_type, parameter_pattern)
"""
# We could receive NAME or NAME:PATTERN
name = parameter_string
pattern = "string"
if ":" in parameter_string:
name, pattern = parameter_string.split(":", 1)
if not name:
raise ValueError(
f"Invalid parameter syntax: {parameter_string}"
) )
default = (str, pattern) # TODO: Implement response
# Pull from pre-configured types # - args,
_type, pattern = REGEX_TYPES.get(pattern, default) # - endpoint,
return name, _type, pattern return (
handler,
(),
params,
route.path,
route.name,
None,
route.ctx.ignore_body,
)
def add( def add(
self, self,
@ -136,368 +39,24 @@ class Router:
handler, handler,
host=None, host=None,
strict_slashes=False, strict_slashes=False,
stream=False,
ignore_body=False, ignore_body=False,
version=None, version=None,
name=None, name=None,
): ) -> Route:
"""Add a handler to the route list # TODO: Implement
# - host
:param uri: path to match # - strict_slashes
:param methods: sequence of accepted method names. If none are # - ignore_body
provided, any method is allowed # - stream
:param handler: request handler function.
When executed, it should provide a response object.
:param strict_slashes: strict to trailing slash
:param ignore_body: Handler should not read the body, if any
:param version: current version of the route or blueprint. See
docs for further details.
:return: Nothing
"""
routes = []
if version is not None: if version is not None:
version = re.escape(str(version).strip("/").lstrip("v")) version = str(version).strip("/").lstrip("v")
uri = "/".join([f"/v{version}", uri.lstrip("/")]) uri = "/".join([f"/v{version}", uri.lstrip("/")])
# add regular version
routes.append( route = super().add(
self._add(uri, methods, handler, host, name, ignore_body) path=uri, handler=handler, methods=methods, name=name
) )
route.ctx.ignore_body = ignore_body
route.ctx.stream = stream
if strict_slashes:
return routes
if not isinstance(host, str) and host is not None:
# we have gotten back to the top of the recursion tree where the
# host was originally a list. By now, we've processed the strict
# slashes logic on the leaf nodes (the individual host strings in
# the list of host)
return routes
# Add versions with and without trailing /
slashed_methods = self.routes_all.get(uri + "/", frozenset({}))
unslashed_methods = self.routes_all.get(uri[:-1], frozenset({}))
if isinstance(methods, Iterable):
_slash_is_missing = all(
method in slashed_methods for method in methods
)
_without_slash_is_missing = all(
method in unslashed_methods for method in methods
)
else:
_slash_is_missing = methods in slashed_methods
_without_slash_is_missing = methods in unslashed_methods
slash_is_missing = not uri[-1] == "/" and not _slash_is_missing
without_slash_is_missing = (
uri[-1] == "/" and not _without_slash_is_missing and not uri == "/"
)
# add version with trailing slash
if slash_is_missing:
routes.append(
self._add(uri + "/", methods, handler, host, name, ignore_body)
)
# add version without trailing slash
elif without_slash_is_missing:
routes.append(
self._add(uri[:-1], methods, handler, host, name, ignore_body)
)
return routes
def _add(
self, uri, methods, handler, host=None, name=None, ignore_body=False
):
"""Add a handler to the route list
:param uri: path to match
:param methods: sequence of accepted method names. If none are
provided, any method is allowed
:param handler: request handler function.
When executed, it should provide a response object.
:param name: user defined route name for url_for
:return: Nothing
"""
if host is not None:
if isinstance(host, str):
uri = host + uri
self.hosts.add(host)
else:
if not isinstance(host, Iterable):
raise ValueError(
f"Expected either string or Iterable of "
f"host strings, not {host!r}"
)
for host_ in host:
self.add(uri, methods, handler, host_, name)
return
# Dict for faster lookups of if method allowed
if methods:
methods = frozenset(methods)
parameters = []
parameter_names = set()
properties = {"unhashable": None}
def add_parameter(match):
name = match.group(1)
name, _type, pattern = self.parse_parameter_string(name)
if name in parameter_names:
raise ParameterNameConflicts(
f"Multiple parameter named <{name}> " f"in route uri {uri}"
)
parameter_names.add(name)
parameter = Parameter(name=name, cast=_type)
parameters.append(parameter)
# Mark the whole route as unhashable if it has the hash key in it
if re.search(r"(^|[^^]){1}/", pattern):
properties["unhashable"] = True
# Mark the route as unhashable if it matches the hash key
elif re.search(r"/", pattern):
properties["unhashable"] = True
return f"({pattern})"
pattern_string = re.sub(self.parameter_pattern, add_parameter, uri)
pattern = re.compile(fr"^{pattern_string}$")
def merge_route(route, methods, handler):
# merge to the existing route when possible.
if not route.methods or not methods:
# method-unspecified routes are not mergeable.
raise RouteExists(f"Route already registered: {uri}")
elif route.methods.intersection(methods):
# already existing method is not overloadable.
duplicated = methods.intersection(route.methods)
duplicated_methods = ",".join(list(duplicated))
raise RouteExists(
f"Route already registered: {uri} [{duplicated_methods}]"
)
if isinstance(route.handler, CompositionView):
view = route.handler
else:
view = CompositionView()
view.add(route.methods, route.handler)
view.add(methods, handler)
route = route._replace(
handler=view, methods=methods.union(route.methods)
)
return route return route
if parameters:
# TODO: This is too complex, we need to reduce the complexity
if properties["unhashable"]:
routes_to_check = self.routes_always_check
ndx, route = self.check_dynamic_route_exists(
pattern, routes_to_check, parameters
)
else:
routes_to_check = self.routes_dynamic[url_hash(uri)]
ndx, route = self.check_dynamic_route_exists(
pattern, routes_to_check, parameters
)
if ndx != -1:
# Pop the ndx of the route, no dups of the same route
routes_to_check.pop(ndx)
else:
route = self.routes_all.get(uri)
# prefix the handler name with the blueprint name
# if available
# special prefix for static files
is_static = False
if name and name.startswith("_static_"):
is_static = True
name = name.split("_static_", 1)[-1]
if hasattr(handler, "__blueprintname__"):
bp_name = handler.__blueprintname__
handler_name = f"{bp_name}.{name or handler.__name__}"
else:
handler_name = name or getattr(
handler, "__name__", handler.__class__.__name__
)
if route:
route = merge_route(route, methods, handler)
else:
endpoint = self.app._build_endpoint_name(handler_name)
route = Route(
handler=handler,
methods=methods,
pattern=pattern,
parameters=parameters,
name=handler_name,
uri=uri,
endpoint=endpoint,
ignore_body=ignore_body,
)
self.routes_all[uri] = route
if is_static:
pair = self.routes_static_files.get(handler_name)
if not (pair and (pair[0] + "/" == uri or uri + "/" == pair[0])):
self.routes_static_files[handler_name] = (uri, route)
else:
pair = self.routes_names.get(handler_name)
if not (pair and (pair[0] + "/" == uri or uri + "/" == pair[0])):
self.routes_names[handler_name] = (uri, route)
if properties["unhashable"]:
self.routes_always_check.append(route)
elif parameters:
self.routes_dynamic[url_hash(uri)].append(route)
else:
self.routes_static[uri] = route
return route
@staticmethod
def check_dynamic_route_exists(pattern, routes_to_check, parameters):
"""
Check if a URL pattern exists in a list of routes provided based on
the comparison of URL pattern and the parameters.
:param pattern: URL parameter pattern
:param routes_to_check: list of dynamic routes either hashable or
unhashable routes.
:param parameters: List of :class:`Parameter` items
:return: Tuple of index and route if matching route exists else
-1 for index and None for route
"""
for ndx, route in enumerate(routes_to_check):
if route.pattern == pattern and route.parameters == parameters:
return ndx, route
else:
return -1, None
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def find_route_by_view_name(self, view_name, name=None):
"""Find a route in the router based on the specified view name.
:param view_name: string of view name to search by
:param kwargs: additional params, usually for static files
:return: tuple containing (uri, Route)
"""
if not view_name:
return (None, None)
if view_name == "static" or view_name.endswith(".static"):
return self.routes_static_files.get(name, (None, None))
return self.routes_names.get(view_name, (None, None))
def get(self, request):
"""Get a request handler based on the URL of the request, or raises an
error
:param request: Request object
:return: handler, arguments, keyword arguments
"""
# No virtual hosts specified; default behavior
if not self.hosts:
return self._get(request.path, request.method, "")
# virtual hosts specified; try to match route to the host header
try:
return self._get(
request.path, request.method, request.headers.get("Host", "")
)
# try default hosts
except NotFound:
return self._get(request.path, request.method, "")
def get_supported_methods(self, url):
"""Get a list of supported methods for a url and optional host.
:param url: URL string (including host)
:return: frozenset of supported methods
"""
route = self.routes_all.get(url)
# if methods are None then this logic will prevent an error
return getattr(route, "methods", None) or frozenset()
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def _get(self, url, method, host):
"""Get a request handler based on the URL of the request, or raises an
error. Internal method for caching.
:param url: request URL
:param method: request method
:return: handler, arguments, keyword arguments
"""
url = unquote(host + url)
# Check against known static routes
route = self.routes_static.get(url)
method_not_supported = MethodNotSupported(
f"Method {method} not allowed for URL {url}",
method=method,
allowed_methods=self.get_supported_methods(url),
)
if route:
if route.methods and method not in route.methods:
raise method_not_supported
match = route.pattern.match(url)
else:
route_found = False
# Move on to testing all regex routes
for route in self.routes_dynamic[url_hash(url)]:
match = route.pattern.match(url)
route_found |= match is not None
# Do early method checking
if match and method in route.methods:
break
else:
# Lastly, check against all regex routes that cannot be hashed
for route in self.routes_always_check:
match = route.pattern.match(url)
route_found |= match is not None
# Do early method checking
if match and method in route.methods:
break
else:
# Route was found but the methods didn't match
if route_found:
raise method_not_supported
raise NotFound(f"Requested URL {url} not found")
kwargs = {
p.name: p.cast(value)
for value, p in zip(match.groups(1), route.parameters)
}
route_handler = route.handler
if hasattr(route_handler, "handlers"):
route_handler = route_handler.handlers[method]
return (
route_handler,
[],
kwargs,
route.uri,
route.name,
route.endpoint,
route.ignore_body,
)
def is_stream_handler(self, request):
"""Handler for request is stream or not.
:param request: Request object
:return: bool
"""
try:
handler = self.get(request)[0]
except (NotFound, MethodNotSupported):
return False
if hasattr(handler, "view_class") and hasattr(
handler.view_class, request.method.lower()
):
handler = getattr(handler.view_class, request.method.lower())
return hasattr(handler, "is_stream")

View File

@ -1,6 +1,7 @@
from functools import partial, wraps from functools import partial, wraps
from mimetypes import guess_type from mimetypes import guess_type
from os import path from os import path
from pathlib import PurePath
from re import sub from re import sub
from time import gmtime, strftime from time import gmtime, strftime
from urllib.parse import unquote from urllib.parse import unquote
@ -14,6 +15,7 @@ from sanic.exceptions import (
) )
from sanic.handlers import ContentRangeHandler from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger from sanic.log import error_logger
from sanic.models.futures import FutureStatic
from sanic.response import HTTPResponse, file, file_stream from sanic.response import HTTPResponse, file, file_stream
@ -110,16 +112,7 @@ async def _static_request_handler(
def register( def register(
app, app,
uri, static: FutureStatic,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name="static",
host=None,
strict_slashes=None,
content_type=None,
): ):
# TODO: Though sanic is not a file server, I feel like we should at least # TODO: Though sanic is not a file server, I feel like we should at least
# make a good effort here. Modified-since is nice, but we could # make a good effort here. Modified-since is nice, but we could
@ -130,7 +123,9 @@ def register(
:param app: Sanic :param app: Sanic
:param file_or_directory: File or directory path to serve from :param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from :param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL :param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return :param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the not modified if the browser's matches the
@ -142,35 +137,48 @@ def register(
If this is an integer, this represents the If this is an integer, this represents the
threshold size to switch to file_stream() threshold size to switch to file_stream()
:param name: user defined name used for url_for :param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header :param content_type: user defined content type for header
:return: registered static routes :return: registered static routes
:rtype: List[sanic.router.Route] :rtype: List[sanic.router.Route]
""" """
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly, # If we're not trying to match a file directly,
# serve from the folder # serve from the folder
if not path.isfile(file_or_directory): if not path.isfile(file_or_directory):
uri += "<file_uri:" + pattern + ">" uri += "<file_uri:" + static.pattern + ">"
# special prefix for static files # special prefix for static files
if not name.startswith("_static_"): if not static.name.startswith("_static_"):
name = f"_static_{name}" name = f"_static_{static.name}"
_handler = wraps(_static_request_handler)( _handler = wraps(_static_request_handler)(
partial( partial(
_static_request_handler, _static_request_handler,
file_or_directory, file_or_directory,
use_modified_since, static.use_modified_since,
use_content_range, static.use_content_range,
stream_large_files, static.stream_large_files,
content_type=content_type, content_type=static.content_type,
) )
) )
_routes, _ = app.route( _routes, _ = app.route(
uri, uri=uri,
methods=["GET", "HEAD"], methods=["GET", "HEAD"],
name=name, name=name,
host=host, host=static.host,
strict_slashes=strict_slashes, strict_slashes=static.strict_slashes,
)(_handler) )(_handler)
return _routes return _routes

View File

@ -1,284 +0,0 @@
from json import JSONDecodeError
from socket import socket
import httpx
import websockets
from sanic.asgi import ASGIApp
from sanic.exceptions import MethodNotSupported
from sanic.log import logger
from sanic.response import text
ASGI_HOST = "mockserver"
ASGI_PORT = 1234
ASGI_BASE_URL = f"http://{ASGI_HOST}:{ASGI_PORT}"
HOST = "127.0.0.1"
PORT = None
class SanicTestClient:
def __init__(self, app, port=PORT, host=HOST):
"""Use port=None to bind to a random port"""
self.app = app
self.port = port
self.host = host
@app.listener("after_server_start")
def _start_test_mode(sanic, *args, **kwargs):
sanic.test_mode = True
@app.listener("before_server_end")
def _end_test_mode(sanic, *args, **kwargs):
sanic.test_mode = False
def get_new_session(self):
return httpx.AsyncClient(verify=False)
async def _local_request(self, method, url, *args, **kwargs):
logger.info(url)
raw_cookies = kwargs.pop("raw_cookies", None)
if method == "websocket":
async with websockets.connect(url, *args, **kwargs) as websocket:
websocket.opened = websocket.open
return websocket
else:
async with self.get_new_session() as session:
try:
if method == "request":
args = [url] + list(args)
url = kwargs.pop("http_method", "GET").upper()
response = await getattr(session, method.lower())(
url, *args, **kwargs
)
except httpx.HTTPError as e:
if hasattr(e, "response"):
response = e.response
else:
logger.error(
f"{method.upper()} {url} received no response!",
exc_info=True,
)
return None
response.body = await response.aread()
response.status = response.status_code
response.content_type = response.headers.get("content-type")
# response can be decoded as json after response._content
# is set by response.aread()
try:
response.json = response.json()
except (JSONDecodeError, UnicodeDecodeError):
response.json = None
if raw_cookies:
response.raw_cookies = {}
for cookie in response.cookies.jar:
response.raw_cookies[cookie.name] = cookie
return response
def _sanic_endpoint_test(
self,
method="get",
uri="/",
gather_request=True,
debug=False,
server_kwargs={"auto_reload": False},
host=None,
*request_args,
**request_kwargs,
):
results = [None, None]
exceptions = []
if gather_request:
def _collect_request(request):
if results[0] is None:
results[0] = request
self.app.request_middleware.appendleft(_collect_request)
@self.app.exception(MethodNotSupported)
async def error_handler(request, exception):
if request.method in ["HEAD", "PATCH", "PUT", "DELETE"]:
return text(
"", exception.status_code, headers=exception.headers
)
else:
return self.app.error_handler.default(request, exception)
if self.port:
server_kwargs = dict(
host=host or self.host,
port=self.port,
**server_kwargs,
)
host, port = host or self.host, self.port
else:
sock = socket()
sock.bind((host or self.host, 0))
server_kwargs = dict(sock=sock, **server_kwargs)
host, port = sock.getsockname()
self.port = port
if uri.startswith(
("http:", "https:", "ftp:", "ftps://", "//", "ws:", "wss:")
):
url = uri
else:
uri = uri if uri.startswith("/") else f"/{uri}"
scheme = "ws" if method == "websocket" else "http"
url = f"{scheme}://{host}:{port}{uri}"
# Tests construct URLs using PORT = None, which means random port not
# known until this function is called, so fix that here
url = url.replace(":None/", f":{port}/")
@self.app.listener("after_server_start")
async def _collect_response(sanic, loop):
try:
response = await self._local_request(
method, url, *request_args, **request_kwargs
)
results[-1] = response
except Exception as e:
logger.exception("Exception")
exceptions.append(e)
self.app.stop()
self.app.run(debug=debug, **server_kwargs)
self.app.listeners["after_server_start"].pop()
if exceptions:
raise ValueError(f"Exception during request: {exceptions}")
if gather_request:
try:
request, response = results
return request, response
except BaseException: # noqa
raise ValueError(
f"Request and response object expected, got ({results})"
)
else:
try:
return results[-1]
except BaseException: # noqa
raise ValueError(f"Request object expected, got ({results})")
def request(self, *args, **kwargs):
return self._sanic_endpoint_test("request", *args, **kwargs)
def get(self, *args, **kwargs):
return self._sanic_endpoint_test("get", *args, **kwargs)
def post(self, *args, **kwargs):
return self._sanic_endpoint_test("post", *args, **kwargs)
def put(self, *args, **kwargs):
return self._sanic_endpoint_test("put", *args, **kwargs)
def delete(self, *args, **kwargs):
return self._sanic_endpoint_test("delete", *args, **kwargs)
def patch(self, *args, **kwargs):
return self._sanic_endpoint_test("patch", *args, **kwargs)
def options(self, *args, **kwargs):
return self._sanic_endpoint_test("options", *args, **kwargs)
def head(self, *args, **kwargs):
return self._sanic_endpoint_test("head", *args, **kwargs)
def websocket(self, *args, **kwargs):
return self._sanic_endpoint_test("websocket", *args, **kwargs)
class TestASGIApp(ASGIApp):
async def __call__(self):
await super().__call__()
return self.request
async def app_call_with_return(self, scope, receive, send):
asgi_app = await TestASGIApp.create(self, scope, receive, send)
return await asgi_app()
class SanicASGITestClient(httpx.AsyncClient):
def __init__(
self,
app,
base_url: str = ASGI_BASE_URL,
suppress_exceptions: bool = False,
) -> None:
app.__class__.__call__ = app_call_with_return
app.asgi = True
self.app = app
transport = httpx.ASGITransport(app=app, client=(ASGI_HOST, ASGI_PORT))
super().__init__(transport=transport, base_url=base_url)
self.last_request = None
def _collect_request(request):
self.last_request = request
@app.listener("after_server_start")
def _start_test_mode(sanic, *args, **kwargs):
sanic.test_mode = True
@app.listener("before_server_end")
def _end_test_mode(sanic, *args, **kwargs):
sanic.test_mode = False
app.request_middleware.appendleft(_collect_request)
async def request(self, method, url, gather_request=True, *args, **kwargs):
self.gather_request = gather_request
response = await super().request(method, url, *args, **kwargs)
response.status = response.status_code
response.body = response.content
response.content_type = response.headers.get("content-type")
return self.last_request, response
async def websocket(self, uri, subprotocols=None, *args, **kwargs):
scheme = "ws"
path = uri
root_path = f"{scheme}://{ASGI_HOST}"
headers = kwargs.get("headers", {})
headers.setdefault("connection", "upgrade")
headers.setdefault("sec-websocket-key", "testserver==")
headers.setdefault("sec-websocket-version", "13")
if subprotocols is not None:
headers.setdefault(
"sec-websocket-protocol", ", ".join(subprotocols)
)
scope = {
"type": "websocket",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"headers": [map(lambda y: y.encode(), x) for x in headers.items()],
"scheme": scheme,
"root_path": root_path,
"path": path,
"query_string": b"",
}
async def receive():
return {}
async def send(message):
pass
await self.app(scope, receive, send)
return None, {}

View File

@ -1,9 +1,13 @@
import types
from importlib.util import module_from_spec, spec_from_file_location from importlib.util import module_from_spec, spec_from_file_location
from os import environ as os_environ from os import environ as os_environ
from pathlib import Path
from re import findall as re_findall from re import findall as re_findall
from typing import Union from typing import Union
from .exceptions import LoadFileException from sanic.exceptions import LoadFileException, PyFileError
from sanic.helpers import import_string
def str_to_bool(val: str) -> bool: def str_to_bool(val: str) -> bool:
@ -39,7 +43,7 @@ def str_to_bool(val: str) -> bool:
def load_module_from_file_location( def load_module_from_file_location(
location: Union[bytes, str], encoding: str = "utf8", *args, **kwargs location: Union[bytes, str, Path], encoding: str = "utf8", *args, **kwargs
): ):
"""Returns loaded module provided as a file path. """Returns loaded module provided as a file path.
@ -67,17 +71,20 @@ def load_module_from_file_location(
"/some/path/${some_env_var}" "/some/path/${some_env_var}"
) )
""" """
# 1) Parse location.
if isinstance(location, bytes): if isinstance(location, bytes):
location = location.decode(encoding) location = location.decode(encoding)
if isinstance(location, Path) or "/" in location or "$" in location:
if not isinstance(location, Path):
# A) Check if location contains any environment variables # A) Check if location contains any environment variables
# in format ${some_env_var}. # in format ${some_env_var}.
env_vars_in_location = set(re_findall(r"\${(.+?)}", location)) env_vars_in_location = set(re_findall(r"\${(.+?)}", location))
# B) Check these variables exists in environment. # B) Check these variables exists in environment.
not_defined_env_vars = env_vars_in_location.difference(os_environ.keys()) not_defined_env_vars = env_vars_in_location.difference(
os_environ.keys()
)
if not_defined_env_vars: if not_defined_env_vars:
raise LoadFileException( raise LoadFileException(
"The following environment variables are not set: " "The following environment variables are not set: "
@ -86,14 +93,39 @@ def load_module_from_file_location(
# C) Substitute them in location. # C) Substitute them in location.
for env_var in env_vars_in_location: for env_var in env_vars_in_location:
location = location.replace("${" + env_var + "}", os_environ[env_var]) location = location.replace(
"${" + env_var + "}", os_environ[env_var]
)
# 2) Load and return module. location = str(location)
if ".py" in location:
name = location.split("/")[-1].split(".")[ name = location.split("/")[-1].split(".")[
0 0
] # get just the file name without path and .py extension ] # get just the file name without path and .py extension
_mod_spec = spec_from_file_location(name, location, *args, **kwargs) _mod_spec = spec_from_file_location(
name, location, *args, **kwargs
)
module = module_from_spec(_mod_spec) module = module_from_spec(_mod_spec)
_mod_spec.loader.exec_module(module) # type: ignore _mod_spec.loader.exec_module(module) # type: ignore
else:
module = types.ModuleType("config")
module.__file__ = str(location)
try:
with open(location) as config_file:
exec( # nosec
compile(config_file.read(), location, "exec"),
module.__dict__,
)
except IOError as e:
e.strerror = "Unable to load configuration file (e.strerror)"
raise
except Exception as e:
raise PyFileError(location) from e
return module return module
else:
try:
return import_string(location)
except ValueError:
raise IOError("Unable to load configuration %s" % str(location))

View File

@ -89,22 +89,20 @@ requirements = [
"aiofiles>=0.6.0", "aiofiles>=0.6.0",
"websockets>=8.1,<9.0", "websockets>=8.1,<9.0",
"multidict>=5.0,<6.0", "multidict>=5.0,<6.0",
"httpx==0.15.4",
] ]
tests_require = [ tests_require = [
"sanic-testing",
"pytest==5.2.1", "pytest==5.2.1",
"multidict>=5.0,<6.0", "multidict>=5.0,<6.0",
"gunicorn==20.0.4", "gunicorn==20.0.4",
"pytest-cov", "pytest-cov",
"httpcore==0.11.*",
"beautifulsoup4", "beautifulsoup4",
uvloop, uvloop,
ujson, ujson,
"pytest-sanic", "pytest-sanic",
"pytest-sugar", "pytest-sugar",
"pytest-benchmark", "pytest-benchmark",
"pytest-dependency",
] ]
docs_require = [ docs_require = [

View File

@ -6,6 +6,8 @@ import uuid
import pytest import pytest
from sanic_testing import TestManager
from sanic import Sanic from sanic import Sanic
from sanic.router import RouteExists, Router from sanic.router import RouteExists, Router
@ -17,6 +19,11 @@ if sys.platform in ["win32", "cygwin"]:
collect_ignore = ["test_worker.py"] collect_ignore = ["test_worker.py"]
@pytest.fixture
def caplog(caplog):
yield caplog
async def _handler(request): async def _handler(request):
""" """
Dummy placeholder method used for route resolver when creating a new Dummy placeholder method used for route resolver when creating a new
@ -127,6 +134,8 @@ def url_param_generator():
return TYPE_TO_GENERATOR_MAP return TYPE_TO_GENERATOR_MAP
@pytest.fixture @pytest.fixture(scope="function")
def app(request): def app(request):
return Sanic(request.node.name) app = Sanic(request.node.name)
# TestManager(app)
return app

View File

@ -41,8 +41,7 @@ def transport(message_stack, receive, send):
@pytest.fixture @pytest.fixture
# @pytest.mark.asyncio def protocol(transport):
def protocol(transport, loop):
return transport.get_protocol() return transport.get_protocol()

View File

@ -1,5 +0,0 @@
from sanic.testing import SanicASGITestClient
def test_asgi_client_instantiation(app):
assert isinstance(app.asgi_client, SanicASGITestClient)

View File

@ -22,24 +22,41 @@ class ConfigTest:
not_for_config = "should not be used" not_for_config = "should not be used"
CONFIG_VALUE = "should be used" CONFIG_VALUE = "should be used"
@property
def ANOTHER_VALUE(self):
return self.CONFIG_VALUE
@property
def another_not_for_config(self):
return self.not_for_config
def test_load_from_object(app): def test_load_from_object(app):
app.config.from_object(ConfigTest) app.config.load(ConfigTest)
assert "CONFIG_VALUE" in app.config assert "CONFIG_VALUE" in app.config
assert app.config.CONFIG_VALUE == "should be used" assert app.config.CONFIG_VALUE == "should be used"
assert "not_for_config" not in app.config assert "not_for_config" not in app.config
def test_load_from_object_string(app): def test_load_from_object_string(app):
app.config.from_object("test_config.ConfigTest") app.config.load("test_config.ConfigTest")
assert "CONFIG_VALUE" in app.config assert "CONFIG_VALUE" in app.config
assert app.config.CONFIG_VALUE == "should be used" assert app.config.CONFIG_VALUE == "should be used"
assert "not_for_config" not in app.config assert "not_for_config" not in app.config
def test_load_from_instance(app):
app.config.load(ConfigTest())
assert "CONFIG_VALUE" in app.config
assert app.config.CONFIG_VALUE == "should be used"
assert app.config.ANOTHER_VALUE == "should be used"
assert "not_for_config" not in app.config
assert "another_not_for_config" not in app.config
def test_load_from_object_string_exception(app): def test_load_from_object_string_exception(app):
with pytest.raises(ImportError): with pytest.raises(ImportError):
app.config.from_object("test_config.Config.test") app.config.load("test_config.Config.test")
def test_auto_load_env(): def test_auto_load_env():
@ -52,7 +69,7 @@ def test_auto_load_env():
def test_auto_load_bool_env(): def test_auto_load_bool_env():
environ["SANIC_TEST_ANSWER"] = "True" environ["SANIC_TEST_ANSWER"] = "True"
app = Sanic(name=__name__) app = Sanic(name=__name__)
assert app.config.TEST_ANSWER == True assert app.config.TEST_ANSWER is True
del environ["SANIC_TEST_ANSWER"] del environ["SANIC_TEST_ANSWER"]
@ -95,7 +112,7 @@ def test_load_from_file(app):
) )
with temp_path() as config_path: with temp_path() as config_path:
config_path.write_text(config) config_path.write_text(config)
app.config.from_pyfile(str(config_path)) app.config.load(str(config_path))
assert "VALUE" in app.config assert "VALUE" in app.config
assert app.config.VALUE == "some value" assert app.config.VALUE == "some value"
assert "CONDITIONAL" in app.config assert "CONDITIONAL" in app.config
@ -105,7 +122,7 @@ def test_load_from_file(app):
def test_load_from_missing_file(app): def test_load_from_missing_file(app):
with pytest.raises(IOError): with pytest.raises(IOError):
app.config.from_pyfile("non-existent file") app.config.load("non-existent file")
def test_load_from_envvar(app): def test_load_from_envvar(app):
@ -113,14 +130,14 @@ def test_load_from_envvar(app):
with temp_path() as config_path: with temp_path() as config_path:
config_path.write_text(config) config_path.write_text(config)
environ["APP_CONFIG"] = str(config_path) environ["APP_CONFIG"] = str(config_path)
app.config.from_envvar("APP_CONFIG") app.config.load("${APP_CONFIG}")
assert "VALUE" in app.config assert "VALUE" in app.config
assert app.config.VALUE == "some value" assert app.config.VALUE == "some value"
def test_load_from_missing_envvar(app): def test_load_from_missing_envvar(app):
with pytest.raises(RuntimeError) as e: with pytest.raises(IOError) as e:
app.config.from_envvar("non-existent variable") app.config.load("non-existent variable")
assert str(e.value) == ( assert str(e.value) == (
"The environment variable 'non-existent " "The environment variable 'non-existent "
"variable' is not set and thus configuration " "variable' is not set and thus configuration "
@ -134,7 +151,7 @@ def test_load_config_from_file_invalid_syntax(app):
config_path.write_text(config) config_path.write_text(config)
with pytest.raises(PyFileError): with pytest.raises(PyFileError):
app.config.from_pyfile(config_path) app.config.load(config_path)
def test_overwrite_exisiting_config(app): def test_overwrite_exisiting_config(app):
@ -143,7 +160,7 @@ def test_overwrite_exisiting_config(app):
class Config: class Config:
DEFAULT = 2 DEFAULT = 2
app.config.from_object(Config) app.config.load(Config)
assert app.config.DEFAULT == 2 assert app.config.DEFAULT == 2
@ -153,14 +170,12 @@ def test_overwrite_exisiting_config_ignore_lowercase(app):
class Config: class Config:
default = 2 default = 2
app.config.from_object(Config) app.config.load(Config)
assert app.config.default == 1 assert app.config.default == 1
def test_missing_config(app): def test_missing_config(app):
with pytest.raises( with pytest.raises(AttributeError, match="Config has no 'NON_EXISTENT'"):
AttributeError, match="Config has no 'NON_EXISTENT'"
) as e:
_ = app.config.NON_EXISTENT _ = app.config.NON_EXISTENT
@ -175,7 +190,8 @@ def test_config_defaults():
def test_config_custom_defaults(): def test_config_custom_defaults():
""" """
we should have all the variables from defaults rewriting them with custom defaults passed in we should have all the variables from defaults rewriting them with
custom defaults passed in
Config Config
""" """
custom_defaults = { custom_defaults = {
@ -192,7 +208,8 @@ def test_config_custom_defaults():
def test_config_custom_defaults_with_env(): def test_config_custom_defaults_with_env():
""" """
test that environment variables has higher priority than DEFAULT_CONFIG and passed defaults dict test that environment variables has higher priority than DEFAULT_CONFIG
and passed defaults dict
""" """
custom_defaults = { custom_defaults = {
"REQUEST_MAX_SIZE123": 1, "REQUEST_MAX_SIZE123": 1,
@ -226,22 +243,22 @@ def test_config_custom_defaults_with_env():
def test_config_access_log_passing_in_run(app): def test_config_access_log_passing_in_run(app):
assert app.config.ACCESS_LOG == True assert app.config.ACCESS_LOG is True
@app.listener("after_server_start") @app.listener("after_server_start")
async def _request(sanic, loop): async def _request(sanic, loop):
app.stop() app.stop()
app.run(port=1340, access_log=False) app.run(port=1340, access_log=False)
assert app.config.ACCESS_LOG == False assert app.config.ACCESS_LOG is False
app.run(port=1340, access_log=True) app.run(port=1340, access_log=True)
assert app.config.ACCESS_LOG == True assert app.config.ACCESS_LOG is True
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_config_access_log_passing_in_create_server(app): async def test_config_access_log_passing_in_create_server(app):
assert app.config.ACCESS_LOG == True assert app.config.ACCESS_LOG is True
@app.listener("after_server_start") @app.listener("after_server_start")
async def _request(sanic, loop): async def _request(sanic, loop):
@ -250,24 +267,51 @@ async def test_config_access_log_passing_in_create_server(app):
await app.create_server( await app.create_server(
port=1341, access_log=False, return_asyncio_server=True port=1341, access_log=False, return_asyncio_server=True
) )
assert app.config.ACCESS_LOG == False assert app.config.ACCESS_LOG is False
await app.create_server( await app.create_server(
port=1342, access_log=True, return_asyncio_server=True port=1342, access_log=True, return_asyncio_server=True
) )
assert app.config.ACCESS_LOG == True assert app.config.ACCESS_LOG is True
def test_config_rewrite_keep_alive(): def test_config_rewrite_keep_alive():
config = Config() config = Config()
assert config.KEEP_ALIVE == DEFAULT_CONFIG["KEEP_ALIVE"] assert config.KEEP_ALIVE == DEFAULT_CONFIG["KEEP_ALIVE"]
config = Config(keep_alive=True) config = Config(keep_alive=True)
assert config.KEEP_ALIVE == True assert config.KEEP_ALIVE is True
config = Config(keep_alive=False) config = Config(keep_alive=False)
assert config.KEEP_ALIVE == False assert config.KEEP_ALIVE is False
# use defaults # use defaults
config = Config(defaults={"KEEP_ALIVE": False}) config = Config(defaults={"KEEP_ALIVE": False})
assert config.KEEP_ALIVE == False assert config.KEEP_ALIVE is False
config = Config(defaults={"KEEP_ALIVE": True}) config = Config(defaults={"KEEP_ALIVE": True})
assert config.KEEP_ALIVE == True assert config.KEEP_ALIVE is True
_test_setting_as_dict = {"TEST_SETTING_VALUE": 1}
_test_setting_as_class = type("C", (), {"TEST_SETTING_VALUE": 1})
_test_setting_as_module = str(
Path(__file__).parent / "static/app_test_config.py"
)
@pytest.mark.parametrize(
"conf_object",
[
_test_setting_as_dict,
_test_setting_as_class,
_test_setting_as_module,
],
ids=["from_dict", "from_class", "from_file"],
)
def test_update(app, conf_object):
app.update_config(conf_object)
assert app.config["TEST_SETTING_VALUE"] == 1
def test_update_from_lowercase_key(app):
d = {"test_setting_value": 1}
app.update_config(d)
assert "test_setting_value" not in app.config

View File

@ -162,7 +162,7 @@ def test_cookie_set_same_key(app):
assert response.cookies["test"] == "pass" assert response.cookies["test"] == "pass"
@pytest.mark.parametrize("max_age", ["0", 30, 30.0, 30.1, "30", "test"]) @pytest.mark.parametrize("max_age", ["0", 30, "30"])
def test_cookie_max_age(app, max_age): def test_cookie_max_age(app, max_age):
cookies = {"test": "wait"} cookies = {"test": "wait"}
@ -204,6 +204,23 @@ def test_cookie_max_age(app, max_age):
assert cookie is None assert cookie is None
@pytest.mark.parametrize("max_age", [30.0, 30.1, "test"])
def test_cookie_bad_max_age(app, max_age):
cookies = {"test": "wait"}
@app.get("/")
def handler(request):
response = text("pass")
response.cookies["test"] = "pass"
response.cookies["test"]["max-age"] = max_age
return response
request, response = app.test_client.get(
"/", cookies=cookies, raw_cookies=True
)
assert response.status == 500
@pytest.mark.parametrize( @pytest.mark.parametrize(
"expires", [datetime.utcnow() + timedelta(seconds=60)] "expires", [datetime.utcnow() + timedelta(seconds=60)]
) )

View File

@ -8,10 +8,11 @@ import httpcore
import httpx import httpx
import pytest import pytest
from sanic_testing.testing import HOST, SanicTestClient
from sanic import Sanic, server from sanic import Sanic, server
from sanic.compat import OS_IS_WINDOWS from sanic.compat import OS_IS_WINDOWS
from sanic.response import text from sanic.response import text
from sanic.testing import HOST, SanicTestClient
CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True} CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}

View File

@ -1,38 +0,0 @@
from pathlib import Path
from types import ModuleType
import pytest
from sanic.exceptions import LoadFileException
from sanic.utils import load_module_from_file_location
@pytest.fixture
def loaded_module_from_file_location():
return load_module_from_file_location(
str(Path(__file__).parent / "static" / "app_test_config.py")
)
@pytest.mark.dependency(name="test_load_module_from_file_location")
def test_load_module_from_file_location(loaded_module_from_file_location):
assert isinstance(loaded_module_from_file_location, ModuleType)
@pytest.mark.dependency(depends=["test_load_module_from_file_location"])
def test_loaded_module_from_file_location_name(
loaded_module_from_file_location,
):
name = loaded_module_from_file_location.__name__
if "C:\\" in name:
name = name.split("\\")[-1]
assert name == "app_test_config"
def test_load_module_from_file_location_with_non_existing_env_variable():
with pytest.raises(
LoadFileException,
match="The following environment variables are not set: MuuMilk",
):
load_module_from_file_location("${MuuMilk}")

View File

@ -8,13 +8,14 @@ from io import StringIO
import pytest import pytest
from sanic_testing.testing import SanicTestClient
import sanic import sanic
from sanic import Sanic from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS from sanic.compat import OS_IS_WINDOWS
from sanic.log import LOGGING_CONFIG_DEFAULTS, logger from sanic.log import LOGGING_CONFIG_DEFAULTS, logger
from sanic.response import text from sanic.response import text
from sanic.testing import SanicTestClient
logging_format = """module: %(module)s; \ logging_format = """module: %(module)s; \
@ -34,6 +35,7 @@ def test_log(app):
logging.basicConfig( logging.basicConfig(
format=logging_format, level=logging.DEBUG, stream=log_stream format=logging_format, level=logging.DEBUG, stream=log_stream
) )
logging.getLogger("asyncio").setLevel(logging.WARNING)
log = logging.getLogger() log = logging.getLogger()
rand_string = str(uuid.uuid4()) rand_string = str(uuid.uuid4())

View File

@ -1,16 +1,9 @@
import asyncio import asyncio
import logging import logging
from sanic_testing.testing import PORT
from sanic.config import BASE_LOGO from sanic.config import BASE_LOGO
from sanic.testing import PORT
try:
import uvloop # noqa
ROW = 0
except BaseException:
ROW = 1
def test_logo_base(app, caplog): def test_logo_base(app, caplog):
@ -28,8 +21,8 @@ def test_logo_base(app, caplog):
loop.run_until_complete(_server.wait_closed()) loop.run_until_complete(_server.wait_closed())
app.stop() app.stop()
assert caplog.record_tuples[ROW][1] == logging.DEBUG assert caplog.record_tuples[0][1] == logging.DEBUG
assert caplog.record_tuples[ROW][2] == BASE_LOGO assert caplog.record_tuples[0][2] == BASE_LOGO
def test_logo_false(app, caplog): def test_logo_false(app, caplog):
@ -49,8 +42,8 @@ def test_logo_false(app, caplog):
loop.run_until_complete(_server.wait_closed()) loop.run_until_complete(_server.wait_closed())
app.stop() app.stop()
banner, port = caplog.record_tuples[ROW][2].rsplit(":", 1) banner, port = caplog.record_tuples[0][2].rsplit(":", 1)
assert caplog.record_tuples[ROW][1] == logging.INFO assert caplog.record_tuples[0][1] == logging.INFO
assert banner == "Goin' Fast @ http://127.0.0.1" assert banner == "Goin' Fast @ http://127.0.0.1"
assert int(port) > 0 assert int(port) > 0
@ -72,8 +65,8 @@ def test_logo_true(app, caplog):
loop.run_until_complete(_server.wait_closed()) loop.run_until_complete(_server.wait_closed())
app.stop() app.stop()
assert caplog.record_tuples[ROW][1] == logging.DEBUG assert caplog.record_tuples[0][1] == logging.DEBUG
assert caplog.record_tuples[ROW][2] == BASE_LOGO assert caplog.record_tuples[0][2] == BASE_LOGO
def test_logo_custom(app, caplog): def test_logo_custom(app, caplog):
@ -93,5 +86,5 @@ def test_logo_custom(app, caplog):
loop.run_until_complete(_server.wait_closed()) loop.run_until_complete(_server.wait_closed())
app.stop() app.stop()
assert caplog.record_tuples[ROW][1] == logging.DEBUG assert caplog.record_tuples[0][1] == logging.DEBUG
assert caplog.record_tuples[ROW][2] == "My Custom Logo" assert caplog.record_tuples[0][2] == "My Custom Logo"

View File

@ -5,9 +5,10 @@ import signal
import pytest import pytest
from sanic_testing.testing import HOST, PORT
from sanic import Blueprint from sanic import Blueprint
from sanic.response import text from sanic.response import text
from sanic.testing import HOST, PORT
@pytest.mark.skipif( @pytest.mark.skipif(

76
tests/test_request.py Normal file
View File

@ -0,0 +1,76 @@
from unittest.mock import Mock
from uuid import UUID, uuid4
import pytest
from sanic import Sanic, response
from sanic.request import Request, uuid
def test_no_request_id_not_called(monkeypatch):
monkeypatch.setattr(uuid, "uuid4", Mock())
request = Request(b"/", {}, None, "GET", None, None)
assert request._id is None
uuid.uuid4.assert_not_called()
def test_request_id_generates_from_request(monkeypatch):
monkeypatch.setattr(Request, "generate_id", Mock())
Request.generate_id.return_value = 1
request = Request(b"/", {}, None, "GET", None, Mock())
for _ in range(10):
request.id
Request.generate_id.assert_called_once_with(request)
def test_request_id_defaults_uuid():
request = Request(b"/", {}, None, "GET", None, Mock())
assert isinstance(request.id, UUID)
# Makes sure that it has been cached and not called multiple times
assert request.id == request.id == request._id
@pytest.mark.parametrize(
"request_id,expected_type",
(
(99, int),
(uuid4(), UUID),
("foo", str),
),
)
def test_request_id(request_id, expected_type):
app = Sanic("req-generator")
@app.get("/")
async def get(request):
return response.empty()
request, _ = app.test_client.get(
"/", headers={"X-REQUEST-ID": f"{request_id}"}
)
assert request.id == request_id
assert type(request.id) == expected_type
def test_custom_generator():
REQUEST_ID = 99
class FooRequest(Request):
@classmethod
def generate_id(cls, request):
return int(request.headers["some-other-request-id"]) * 2
app = Sanic("req-generator", request_class=FooRequest)
@app.get("/")
async def get(request):
return response.empty()
request, _ = app.test_client.get(
"/", headers={"SOME-OTHER-REQUEST-ID": f"{REQUEST_ID}"}
)
assert request.id == REQUEST_ID * 2

View File

@ -16,10 +16,10 @@ from httpcore._async.connection_pool import ResponseByteStream
from httpcore._exceptions import LocalProtocolError, UnsupportedProtocol from httpcore._exceptions import LocalProtocolError, UnsupportedProtocol
from httpcore._types import TimeoutDict from httpcore._types import TimeoutDict
from httpcore._utils import url_to_origin from httpcore._utils import url_to_origin
from sanic_testing.testing import SanicTestClient
from sanic import Sanic from sanic import Sanic
from sanic.response import text from sanic.response import text
from sanic.testing import SanicTestClient
class DelayableHTTPConnection(httpcore._async.connection.AsyncHTTPConnection): class DelayableHTTPConnection(httpcore._async.connection.AsyncHTTPConnection):

View File

@ -8,11 +8,7 @@ from urllib.parse import urlparse
import pytest import pytest
from sanic import Blueprint, Sanic from sanic_testing.testing import (
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.response import html, json, text
from sanic.testing import (
ASGI_BASE_URL, ASGI_BASE_URL,
ASGI_HOST, ASGI_HOST,
ASGI_PORT, ASGI_PORT,
@ -21,6 +17,11 @@ from sanic.testing import (
SanicTestClient, SanicTestClient,
) )
from sanic import Blueprint, Sanic
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.response import html, json, text
# ------------------------------------------------------------ # # ------------------------------------------------------------ #
# GET # GET

View File

@ -12,6 +12,7 @@ from urllib.parse import unquote
import pytest import pytest
from aiofiles import os as async_os from aiofiles import os as async_os
from sanic_testing.testing import HOST, PORT
from sanic.response import ( from sanic.response import (
HTTPResponse, HTTPResponse,
@ -25,7 +26,6 @@ from sanic.response import (
text, text,
) )
from sanic.server import HttpProtocol from sanic.server import HttpProtocol
from sanic.testing import HOST, PORT
JSON_DATA = {"ok": True} JSON_DATA = {"ok": True}

View File

@ -2,11 +2,12 @@ import asyncio
import pytest import pytest
from sanic_testing.testing import SanicTestClient
from sanic import Sanic from sanic import Sanic
from sanic.constants import HTTP_METHODS from sanic.constants import HTTP_METHODS
from sanic.response import json, text from sanic.response import json, text
from sanic.router import ParameterNameConflicts, RouteDoesNotExist, RouteExists from sanic.router import ParameterNameConflicts, RouteDoesNotExist, RouteExists
from sanic.testing import SanicTestClient
# ------------------------------------------------------------ # # ------------------------------------------------------------ #
@ -479,21 +480,21 @@ def test_websocket_route_with_subprotocols(app):
results.append(ws.subprotocol) results.append(ws.subprotocol)
assert ws.subprotocol is not None assert ws.subprotocol is not None
request, response = app.test_client.websocket("/ws", subprotocols=["bar"]) _, response = SanicTestClient(app).websocket("/ws", subprotocols=["bar"])
assert response.opened is True assert response.opened is True
assert results == ["bar"] assert results == ["bar"]
request, response = app.test_client.websocket( _, response = SanicTestClient(app).websocket(
"/ws", subprotocols=["bar", "foo"] "/ws", subprotocols=["bar", "foo"]
) )
assert response.opened is True assert response.opened is True
assert results == ["bar", "bar"] assert results == ["bar", "bar"]
request, response = app.test_client.websocket("/ws", subprotocols=["baz"]) _, response = SanicTestClient(app).websocket("/ws", subprotocols=["baz"])
assert response.opened is True assert response.opened is True
assert results == ["bar", "bar", None] assert results == ["bar", "bar", None]
request, response = app.test_client.websocket("/ws") _, response = SanicTestClient(app).websocket("/ws")
assert response.opened is True assert response.opened is True
assert results == ["bar", "bar", None, None] assert results == ["bar", "bar", None, None]

View File

@ -6,7 +6,7 @@ from socket import socket
import pytest import pytest
from sanic.testing import HOST, PORT from sanic_testing.testing import HOST, PORT
AVAILABLE_LISTENERS = [ AVAILABLE_LISTENERS = [

View File

@ -7,9 +7,10 @@ from unittest.mock import MagicMock
import pytest import pytest
from sanic_testing.testing import HOST, PORT
from sanic.compat import ctrlc_workaround_for_windows from sanic.compat import ctrlc_workaround_for_windows
from sanic.response import HTTPResponse from sanic.response import HTTPResponse
from sanic.testing import HOST, PORT
async def stop(app, loop): async def stop(app, loop):

View File

@ -1,6 +1,6 @@
import inspect import inspect
import os import os
from pathlib import Path
from time import gmtime, strftime from time import gmtime, strftime
import pytest import pytest
@ -76,6 +76,41 @@ def test_static_file(app, static_file_directory, file_name):
assert response.body == get_file_content(static_file_directory, file_name) assert response.body == get_file_content(static_file_directory, file_name)
@pytest.mark.parametrize(
"file_name",
["test.file", "decode me.txt", "python.png", "symlink", "hard_link"],
)
def test_static_file_pathlib(app, static_file_directory, file_name):
file_path = Path(get_file_path(static_file_directory, file_name))
app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
@pytest.mark.parametrize(
"file_name",
[b"test.file", b"decode me.txt", b"python.png"],
)
def test_static_file_bytes(app, static_file_directory, file_name):
bsep = os.path.sep.encode('utf-8')
file_path = static_file_directory.encode('utf-8') + bsep + file_name
app.static("/testing.file", file_path)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
@pytest.mark.parametrize(
"file_name",
[dict(), list(), object()],
)
def test_static_file_invalid_path(app, static_file_directory, file_name):
with pytest.raises(ValueError):
app.static("/testing.file", file_name)
request, response = app.test_client.get("/testing.file")
assert response.status == 404
@pytest.mark.parametrize("file_name", ["test.html"]) @pytest.mark.parametrize("file_name", ["test.html"])
def test_static_file_content_type(app, static_file_directory, file_name): def test_static_file_content_type(app, static_file_directory, file_name):
app.static( app.static(

View File

@ -1,5 +1,6 @@
from sanic_testing.testing import PORT, SanicTestClient
from sanic.response import json, text from sanic.response import json, text
from sanic.testing import PORT, SanicTestClient
# ------------------------------------------------------------ # # ------------------------------------------------------------ #

View File

@ -1,36 +0,0 @@
from pathlib import Path
import pytest
_test_setting_as_dict = {"TEST_SETTING_VALUE": 1}
_test_setting_as_class = type("C", (), {"TEST_SETTING_VALUE": 1})
_test_setting_as_module = str(
Path(__file__).parent / "static/app_test_config.py"
)
@pytest.mark.parametrize(
"conf_object",
[
_test_setting_as_dict,
_test_setting_as_class,
pytest.param(
_test_setting_as_module,
marks=pytest.mark.dependency(
depends=["test_load_module_from_file_location"],
scope="session",
),
),
],
ids=["from_dict", "from_class", "from_file"],
)
def test_update(app, conf_object):
app.update_config(conf_object)
assert app.config["TEST_SETTING_VALUE"] == 1
def test_update_from_lowercase_key(app):
d = {"test_setting_value": 1}
app.update_config(d)
assert "test_setting_value" not in app.config

View File

@ -4,11 +4,12 @@ from urllib.parse import parse_qsl, urlsplit
import pytest as pytest import pytest as pytest
from sanic_testing.testing import HOST as test_host
from sanic_testing.testing import PORT as test_port
from sanic.blueprints import Blueprint from sanic.blueprints import Blueprint
from sanic.exceptions import URLBuildError from sanic.exceptions import URLBuildError
from sanic.response import text from sanic.response import text
from sanic.testing import HOST as test_host
from sanic.testing import PORT as test_port
from sanic.views import HTTPMethodView from sanic.views import HTTPMethodView

View File

@ -1,5 +1,7 @@
import asyncio import asyncio
from sanic_testing.testing import SanicTestClient
from sanic.blueprints import Blueprint from sanic.blueprints import Blueprint
@ -48,14 +50,14 @@ def test_websocket_bp_route_name(app):
uri = app.url_for("test_bp.test_route") uri = app.url_for("test_bp.test_route")
assert uri == "/bp/route" assert uri == "/bp/route"
request, response = app.test_client.websocket(uri) request, response = SanicTestClient(app).websocket(uri)
assert response.opened is True assert response.opened is True
assert event.is_set() assert event.is_set()
event.clear() event.clear()
uri = app.url_for("test_bp.test_route2") uri = app.url_for("test_bp.test_route2")
assert uri == "/bp/route2" assert uri == "/bp/route2"
request, response = app.test_client.websocket(uri) request, response = SanicTestClient(app).websocket(uri)
assert response.opened is True assert response.opened is True
assert event.is_set() assert event.is_set()

50
tests/test_utils.py Normal file
View File

@ -0,0 +1,50 @@
from os import environ
from pathlib import Path
from types import ModuleType
import pytest
from sanic.exceptions import LoadFileException
from sanic.utils import load_module_from_file_location
@pytest.mark.parametrize(
"location",
(
Path(__file__).parent / "static" / "app_test_config.py",
str(Path(__file__).parent / "static" / "app_test_config.py"),
str(Path(__file__).parent / "static" / "app_test_config.py").encode(),
),
)
def test_load_module_from_file_location(location):
module = load_module_from_file_location(location)
assert isinstance(module, ModuleType)
def test_loaded_module_from_file_location_name():
module = load_module_from_file_location(
str(Path(__file__).parent / "static" / "app_test_config.py")
)
name = module.__name__
if "C:\\" in name:
name = name.split("\\")[-1]
assert name == "app_test_config"
def test_load_module_from_file_location_with_non_existing_env_variable():
with pytest.raises(
LoadFileException,
match="The following environment variables are not set: MuuMilk",
):
load_module_from_file_location("${MuuMilk}")
def test_load_module_from_file_location_using_env():
environ["APP_TEST_CONFIG"] = "static/app_test_config.py"
location = str(Path(__file__).parent / "${APP_TEST_CONFIG}")
module = load_module_from_file_location(location)
assert isinstance(module, ModuleType)

View File

@ -7,15 +7,13 @@ setenv =
{py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UJSON=1 {py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UJSON=1
{py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UVLOOP=1 {py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UVLOOP=1
deps = deps =
sanic-testing==0.1.2
coverage==5.3 coverage==5.3
pytest==5.2.1 pytest==5.2.1
pytest-cov pytest-cov
pytest-sanic pytest-sanic
pytest-sugar pytest-sugar
pytest-benchmark pytest-benchmark
pytest-dependency
httpcore==0.11.*
httpx==0.15.4
chardet==3.* chardet==3.*
beautifulsoup4 beautifulsoup4
gunicorn==20.0.4 gunicorn==20.0.4