Merge pull request #2010 from sanic-org/sanic-routing

Sanic routing
This commit is contained in:
Adam Hopkins 2021-02-16 10:09:12 +02:00 committed by GitHub
commit 8f8c00a99d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2360 additions and 2113 deletions

View File

@ -63,7 +63,7 @@ ifdef include_tests
isort -rc sanic tests
else
$(info Sorting Imports)
isort -rc sanic tests
isort -rc sanic tests --profile=black
endif
endif
@ -71,7 +71,7 @@ black:
black --config ./.black.toml sanic tests
fix-import: black
isort sanic tests
isort sanic tests --profile=black
docs-clean:

File diff suppressed because it is too large Load Diff

View File

@ -131,6 +131,7 @@ class Lifespan:
in sequence since the ASGI lifespan protocol only supports a single
startup event.
"""
self.asgi_app.sanic_app.router.finalize()
listeners = self.asgi_app.sanic_app.listeners.get(
"before_server_start", []
) + self.asgi_app.sanic_app.listeners.get("after_server_start", [])

36
sanic/base.py Normal file
View File

@ -0,0 +1,36 @@
from sanic.mixins.exceptions import ExceptionMixin
from sanic.mixins.listeners import ListenerMixin
from sanic.mixins.middleware import MiddlewareMixin
from sanic.mixins.routes import RouteMixin
class Base(type):
def __new__(cls, name, bases, attrs):
init = attrs.get("__init__")
def __init__(self, *args, **kwargs):
nonlocal init
nonlocal name
bases = [
b for base in type(self).__bases__ for b in base.__bases__
]
for base in bases:
base.__init__(self, *args, **kwargs)
if init:
init(self, *args, **kwargs)
attrs["__init__"] = __init__
return type.__new__(cls, name, bases, attrs)
class BaseSanic(
RouteMixin,
MiddlewareMixin,
ListenerMixin,
ExceptionMixin,
metaclass=Base,
):
...

View File

@ -112,10 +112,13 @@ class BlueprintGroup(MutableSequence):
:param kwargs: Optional Keyword arg to use with Middleware
:return: Partial function to apply the middleware
"""
kwargs["bp_group"] = True
def register_middleware_for_blueprints(fn):
for blueprint in self.blueprints:
blueprint.middleware(fn, *args, **kwargs)
if args and callable(args[0]):
fn = args[0]
args = list(args)[1:]
return register_middleware_for_blueprints(fn)
return register_middleware_for_blueprints

View File

@ -1,36 +1,11 @@
from collections import defaultdict, namedtuple
from collections import defaultdict
from sanic.base import BaseSanic
from sanic.blueprint_group import BlueprintGroup
from sanic.constants import HTTP_METHODS
from sanic.views import CompositionView
from sanic.models.futures import FutureRoute, FutureStatic
FutureRoute = namedtuple(
"FutureRoute",
[
"handler",
"uri",
"methods",
"host",
"strict_slashes",
"stream",
"version",
"name",
],
)
FutureListener = namedtuple(
"FutureListener", ["handler", "uri", "methods", "host"]
)
FutureMiddleware = namedtuple(
"FutureMiddleware", ["middleware", "args", "kwargs"]
)
FutureException = namedtuple("FutureException", ["handler", "args", "kwargs"])
FutureStatic = namedtuple(
"FutureStatic", ["uri", "file_or_directory", "args", "kwargs"]
)
class Blueprint:
class Blueprint(BaseSanic):
def __init__(
self,
name,
@ -64,6 +39,26 @@ class Blueprint:
self.version = version
self.strict_slashes = strict_slashes
def route(self, *args, **kwargs):
kwargs["apply"] = False
return super().route(*args, **kwargs)
def static(self, *args, **kwargs):
kwargs["apply"] = False
return super().static(*args, **kwargs)
def middleware(self, *args, **kwargs):
kwargs["apply"] = False
return super().middleware(*args, **kwargs)
def listener(self, *args, **kwargs):
kwargs["apply"] = False
return super().listener(*args, **kwargs)
def exception(self, *args, **kwargs):
kwargs["apply"] = False
return super().exception(*args, **kwargs)
@staticmethod
def group(*blueprints, url_prefix=""):
"""
@ -107,474 +102,62 @@ class Blueprint:
routes = []
# Routes
for future in self.routes:
for future in self._future_routes:
# attach the blueprint name to the handler so that it can be
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri
version = future.version or self.version
strict_slashes = (
self.strict_slashes
if future.strict_slashes is None
and self.strict_slashes is not None
else future.strict_slashes
)
name = app._generate_name(future.name)
_routes, _ = app.route(
uri=uri[1:] if uri.startswith("//") else uri,
methods=future.methods,
host=future.host or self.host,
strict_slashes=future.strict_slashes,
stream=future.stream,
version=version,
name=future.name,
)(future.handler)
if _routes:
routes += _routes
apply_route = FutureRoute(
future.handler,
uri[1:] if uri.startswith("//") else uri,
future.methods,
future.host or self.host,
strict_slashes,
future.stream,
future.version or self.version,
name,
future.ignore_body,
future.websocket,
future.subprotocols,
future.unquote,
future.static,
)
for future in self.websocket_routes:
# attach the blueprint name to the handler so that it can be
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri
_routes, _ = app.websocket(
uri=uri,
host=future.host or self.host,
strict_slashes=future.strict_slashes,
name=future.name,
)(future.handler)
if _routes:
routes += _routes
route = app._apply_route(apply_route)
operation = (
routes.extend if isinstance(route, list) else routes.append
)
operation(route)
# Static Files
for future in self.statics:
for future in self._future_statics:
# Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri
_routes = app.static(
uri, future.file_or_directory, *future.args, **future.kwargs
)
if _routes:
routes += _routes
apply_route = FutureStatic(uri, *future[1:])
route = app._apply_static(apply_route)
routes.append(route)
route_names = [route.name for route in routes if route]
# Middleware
for future in self.middlewares:
if future.args or future.kwargs:
app.register_named_middleware(
future.middleware,
route_names,
*future.args,
**future.kwargs,
)
else:
app.register_named_middleware(future.middleware, route_names)
if route_names:
for future in self._future_middleware:
app._apply_middleware(future, route_names)
# Exceptions
for future in self.exceptions:
app.exception(*future.args, **future.kwargs)(future.handler)
for future in self._future_exceptions:
app._apply_exception_handler(future)
# Event listeners
for event, listeners in self.listeners.items():
for listener in listeners:
app.listener(event)(listener)
def route(
self,
uri,
methods=frozenset({"GET"}),
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""Create a blueprint route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param stream: If the route should provide a streaming support
:param version: Blueprint Version
:param name: Unique name to identify the Route
:return a decorated method that when invoked will return an object
of type :class:`FutureRoute`
"""
if strict_slashes is None:
strict_slashes = self.strict_slashes
def decorator(handler):
route = FutureRoute(
handler,
uri,
methods,
host,
strict_slashes,
stream,
version,
name,
)
self.routes.append(route)
return handler
return decorator
def add_route(
self,
handler,
uri,
methods=frozenset({"GET"}),
host=None,
strict_slashes=None,
version=None,
name=None,
stream=False,
):
"""Create a blueprint route from a function.
:param handler: function for handling uri requests. Accepts function,
or class instance with a view_class method.
:param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param version: Blueprint Version
:param name: user defined route name for url_for
:param stream: boolean specifying if the handler is a stream handler
:return: function or class instance
"""
# Handle HTTPMethodView differently
if hasattr(handler, "view_class"):
methods = set()
for method in HTTP_METHODS:
if getattr(handler.view_class, method.lower(), None):
methods.add(method)
if strict_slashes is None:
strict_slashes = self.strict_slashes
# handle composition view differently
if isinstance(handler, CompositionView):
methods = handler.handlers.keys()
self.route(
uri=uri,
methods=methods,
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)(handler)
return handler
def websocket(
self, uri, host=None, strict_slashes=None, version=None, name=None
):
"""Create a blueprint websocket route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param version: Blueprint Version
:param name: Unique name to identify the Websocket Route
"""
if strict_slashes is None:
strict_slashes = self.strict_slashes
def decorator(handler):
nonlocal uri
nonlocal host
nonlocal strict_slashes
nonlocal version
nonlocal name
name = f"{self.name}.{name or handler.__name__}"
route = FutureRoute(
handler, uri, [], host, strict_slashes, False, version, name
)
self.websocket_routes.append(route)
return handler
return decorator
def add_websocket_route(
self, handler, uri, host=None, version=None, name=None
):
"""Create a blueprint websocket route from a function.
:param handler: function for handling uri requests. Accepts function,
or class instance with a view_class method.
:param uri: endpoint at which the route will be accessible.
:param host: IP Address of FQDN for the sanic server to use.
:param version: Blueprint Version
:param name: Unique name to identify the Websocket Route
:return: function or class instance
"""
self.websocket(uri=uri, host=host, version=version, name=name)(handler)
return handler
def listener(self, event):
"""Create a listener from a decorated function.
:param event: Event to listen to.
"""
def decorator(listener):
self.listeners[event].append(listener)
return listener
return decorator
def middleware(self, *args, **kwargs):
"""
Create a blueprint middleware from a decorated function.
:param args: Positional arguments to be used while invoking the
middleware
:param kwargs: optional keyword args that can be used with the
middleware.
"""
def register_middleware(_middleware):
future_middleware = FutureMiddleware(_middleware, args, kwargs)
self.middlewares.append(future_middleware)
return _middleware
# Detect which way this was called, @middleware or @middleware('AT')
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
middleware = args[0]
args = []
return register_middleware(middleware)
else:
if kwargs.get("bp_group") and callable(args[0]):
middleware = args[0]
args = args[1:]
kwargs.pop("bp_group")
return register_middleware(middleware)
else:
return register_middleware
def exception(self, *args, **kwargs):
"""
This method enables the process of creating a global exception
handler for the current blueprint under question.
:param args: List of Python exceptions to be caught by the handler
:param kwargs: Additional optional arguments to be passed to the
exception handler
:return a decorated method to handle global exceptions for any
route registered under this blueprint.
"""
def decorator(handler):
exception = FutureException(handler, args, kwargs)
self.exceptions.append(exception)
return handler
return decorator
def static(self, uri, file_or_directory, *args, **kwargs):
"""Create a blueprint static route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param file_or_directory: Static asset.
"""
name = kwargs.pop("name", "static")
if not name.startswith(self.name + "."):
name = f"{self.name}.{name}"
kwargs.update(name=name)
strict_slashes = kwargs.get("strict_slashes")
if strict_slashes is None and self.strict_slashes is not None:
kwargs.update(strict_slashes=self.strict_slashes)
static = FutureStatic(uri, file_or_directory, args, kwargs)
self.statics.append(static)
# Shorthand method decorators
def get(
self, uri, host=None, strict_slashes=None, version=None, name=None
):
"""
Add an API URL under the **GET** *HTTP* method
:param uri: URL to be tagged to **GET** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"GET"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
def post(
self,
uri,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""
Add an API URL under the **POST** *HTTP* method
:param uri: URL to be tagged to **POST** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"POST"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def put(
self,
uri,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""
Add an API URL under the **PUT** *HTTP* method
:param uri: URL to be tagged to **PUT** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PUT"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def head(
self, uri, host=None, strict_slashes=None, version=None, name=None
):
"""
Add an API URL under the **HEAD** *HTTP* method
:param uri: URL to be tagged to **HEAD** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"HEAD"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
def options(
self, uri, host=None, strict_slashes=None, version=None, name=None
):
"""
Add an API URL under the **OPTIONS** *HTTP* method
:param uri: URL to be tagged to **OPTIONS** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"OPTIONS"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
def patch(
self,
uri,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""
Add an API URL under the **PATCH** *HTTP* method
:param uri: URL to be tagged to **PATCH** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PATCH"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def delete(
self, uri, host=None, strict_slashes=None, version=None, name=None
):
"""
Add an API URL under the **DELETE** *HTTP* method
:param uri: URL to be tagged to **DELETE** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`sanic.app.Sanic` to check
if the request URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"DELETE"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
)
for listener in self._future_listeners:
app._apply_listener(listener)

View File

@ -17,7 +17,7 @@ class Header(CIMultiDict):
use_trio = argv[0].endswith("hypercorn") and "trio" in argv
if use_trio:
if use_trio: # pragma: no cover
import trio # type: ignore
def stat_async(path):

0
sanic/mixins/__init__.py Normal file
View File

View File

@ -0,0 +1,39 @@
from typing import Set
from sanic.models.futures import FutureException
class ExceptionMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_exceptions: Set[FutureException] = set()
def _apply_exception_handler(self, handler: FutureException):
raise NotImplementedError # noqa
def exception(self, *exceptions, apply=True):
"""
This method enables the process of creating a global exception
handler for the current blueprint under question.
:param args: List of Python exceptions to be caught by the handler
:param kwargs: Additional optional arguments to be passed to the
exception handler
:return a decorated method to handle global exceptions for any
route registered under this blueprint.
"""
def decorator(handler):
nonlocal apply
nonlocal exceptions
if isinstance(exceptions[0], list):
exceptions = tuple(*exceptions)
future_exception = FutureException(handler, exceptions)
self._future_exceptions.add(future_exception)
if apply:
self._apply_exception_handler(future_exception)
return handler
return decorator

55
sanic/mixins/listeners.py Normal file
View File

@ -0,0 +1,55 @@
from enum import Enum, auto
from functools import partial
from typing import List
from sanic.models.futures import FutureListener
class ListenerEvent(str, Enum):
def _generate_next_value_(name: str, *args) -> str: # type: ignore
return name.lower()
BEFORE_SERVER_START = auto()
AFTER_SERVER_START = auto()
BEFORE_SERVER_STOP = auto()
AFTER_SERVER_STOP = auto()
class ListenerMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_listeners: List[FutureListener] = list()
def _apply_listener(self, listener: FutureListener):
raise NotImplementedError # noqa
def listener(self, listener_or_event, event_or_none=None, apply=True):
"""Create a listener from a decorated function.
:param event: Event to listen to.
"""
def register_listener(listener, event):
nonlocal apply
future_listener = FutureListener(listener, event)
self._future_listeners.append(future_listener)
if apply:
self._apply_listener(future_listener)
return listener
if callable(listener_or_event):
return register_listener(listener_or_event, event_or_none)
else:
return partial(register_listener, event=listener_or_event)
def before_server_start(self, listener):
return self.listener(listener, "before_server_start")
def after_server_start(self, listener):
return self.listener(listener, "after_server_start")
def before_server_stop(self, listener):
return self.listener(listener, "before_server_stop")
def after_server_stop(self, listener):
return self.listener(listener, "after_server_stop")

View File

@ -0,0 +1,55 @@
from functools import partial
from typing import List
from sanic.models.futures import FutureMiddleware
class MiddlewareMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_middleware: List[FutureMiddleware] = list()
def _apply_middleware(self, middleware: FutureMiddleware):
raise NotImplementedError # noqa
def middleware(
self, middleware_or_request, attach_to="request", apply=True
):
"""
Decorate and register middleware to be called before a request.
Can either be called as *@app.middleware* or
*@app.middleware('request')*
:param: middleware_or_request: Optional parameter to use for
identifying which type of middleware is being registered.
"""
def register_middleware(middleware, attach_to="request"):
nonlocal apply
future_middleware = FutureMiddleware(middleware, attach_to)
self._future_middleware.append(future_middleware)
if apply:
self._apply_middleware(future_middleware)
return middleware
# Detect which way this was called, @middleware or @middleware('AT')
if callable(middleware_or_request):
return register_middleware(
middleware_or_request, attach_to=attach_to
)
else:
return partial(
register_middleware, attach_to=middleware_or_request
)
def on_request(self, middleware=None):
if callable(middleware):
return self.middleware(middleware, "request")
else:
return partial(self.middleware, attach_to="request")
def on_response(self, middleware=None):
if callable(middleware):
return self.middleware(middleware, "response")
else:
return partial(self.middleware, attach_to="response")

761
sanic/mixins/routes.py Normal file
View File

@ -0,0 +1,761 @@
from functools import partial, wraps
from inspect import signature
from mimetypes import guess_type
from os import path
from pathlib import PurePath
from re import sub
from time import gmtime, strftime
from typing import Set, Union
from urllib.parse import unquote
from sanic_routing.route import Route # type: ignore
from sanic.compat import stat_async
from sanic.constants import HTTP_METHODS
from sanic.exceptions import (
ContentRangeError,
FileNotFound,
HeaderNotFound,
InvalidUsage,
)
from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger
from sanic.models.futures import FutureRoute, FutureStatic
from sanic.response import HTTPResponse, file, file_stream
from sanic.views import CompositionView
class RouteMixin:
def __init__(self, *args, **kwargs) -> None:
self._future_routes: Set[FutureRoute] = set()
self._future_statics: Set[FutureStatic] = set()
self.name = ""
self.strict_slashes = False
def _apply_route(self, route: FutureRoute) -> Route:
raise NotImplementedError # noqa
def _apply_static(self, static: FutureStatic) -> Route:
raise NotImplementedError # noqa
def route(
self,
uri,
methods=None,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
ignore_body=False,
apply=True,
subprotocols=None,
websocket=False,
unquote=False,
static=False,
):
"""Create a blueprint route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param stream: If the route should provide a streaming support
:param version: Blueprint Version
:param name: Unique name to identify the Route
:return a decorated method that when invoked will return an object
of type :class:`FutureRoute`
"""
# Fix case where the user did not prefix the URL with a /
# and will probably get confused as to why it's not working
if not uri.startswith("/"):
uri = "/" + uri
if strict_slashes is None:
strict_slashes = self.strict_slashes
if not methods and not websocket:
methods = frozenset({"GET"})
def decorator(handler):
nonlocal uri
nonlocal methods
nonlocal host
nonlocal strict_slashes
nonlocal stream
nonlocal version
nonlocal name
nonlocal ignore_body
nonlocal subprotocols
nonlocal websocket
nonlocal static
if isinstance(handler, tuple):
# if a handler fn is already wrapped in a route, the handler
# variable will be a tuple of (existing routes, handler fn)
_, handler = handler
name = self._generate_name(name, handler)
if isinstance(host, str):
host = frozenset([host])
elif host and not isinstance(host, frozenset):
try:
host = frozenset(host)
except TypeError:
raise ValueError(
"Expected either string or Iterable of host strings, "
"not %s" % host
)
if isinstance(subprotocols, (list, tuple, set)):
subprotocols = frozenset(subprotocols)
route = FutureRoute(
handler,
uri,
None if websocket else frozenset([x.upper() for x in methods]),
host,
strict_slashes,
stream,
version,
name,
ignore_body,
websocket,
subprotocols,
unquote,
static,
)
self._future_routes.add(route)
args = list(signature(handler).parameters.keys())
if websocket and len(args) < 2:
handler_name = handler.__name__
raise ValueError(
f"Required parameter `request` and/or `ws` missing "
f"in the {handler_name}() route?"
)
elif not args:
handler_name = handler.__name__
raise ValueError(
f"Required parameter `request` missing "
f"in the {handler_name}() route?"
)
if not websocket and stream:
handler.is_stream = stream
if apply:
self._apply_route(route)
return route, handler
return decorator
def add_route(
self,
handler,
uri,
methods=frozenset({"GET"}),
host=None,
strict_slashes=None,
version=None,
name=None,
stream=False,
):
"""A helper method to register class instance or
functions as a handler to the application url
routes.
:param handler: function or class instance
:param uri: path of the URL
:param methods: list or tuple of methods allowed, these are overridden
if using a HTTPMethodView
:param host:
:param strict_slashes:
:param version:
:param name: user defined route name for url_for
:param stream: boolean specifying if the handler is a stream handler
:return: function or class instance
"""
# Handle HTTPMethodView differently
if hasattr(handler, "view_class"):
methods = set()
for method in HTTP_METHODS:
_handler = getattr(handler.view_class, method.lower(), None)
if _handler:
methods.add(method)
if hasattr(_handler, "is_stream"):
stream = True
# handle composition view differently
if isinstance(handler, CompositionView):
methods = handler.handlers.keys()
for _handler in handler.handlers.values():
if hasattr(_handler, "is_stream"):
stream = True
break
if strict_slashes is None:
strict_slashes = self.strict_slashes
self.route(
uri=uri,
methods=methods,
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)(handler)
return handler
# Shorthand method decorators
def get(
self,
uri,
host=None,
strict_slashes=None,
version=None,
name=None,
ignore_body=True,
):
"""
Add an API URL under the **GET** *HTTP* method
:param uri: URL to be tagged to **GET** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"GET"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def post(
self,
uri,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""
Add an API URL under the **POST** *HTTP* method
:param uri: URL to be tagged to **POST** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"POST"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def put(
self,
uri,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""
Add an API URL under the **PUT** *HTTP* method
:param uri: URL to be tagged to **PUT** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PUT"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def head(
self,
uri,
host=None,
strict_slashes=None,
version=None,
name=None,
ignore_body=True,
):
return self.route(
uri,
methods=frozenset({"HEAD"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def options(
self,
uri,
host=None,
strict_slashes=None,
version=None,
name=None,
ignore_body=True,
):
"""
Add an API URL under the **OPTIONS** *HTTP* method
:param uri: URL to be tagged to **OPTIONS** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"OPTIONS"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def patch(
self,
uri,
host=None,
strict_slashes=None,
stream=False,
version=None,
name=None,
):
"""
Add an API URL under the **PATCH** *HTTP* method
:param uri: URL to be tagged to **PATCH** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"PATCH"}),
host=host,
strict_slashes=strict_slashes,
stream=stream,
version=version,
name=name,
)
def delete(
self,
uri,
host=None,
strict_slashes=None,
version=None,
name=None,
ignore_body=True,
):
"""
Add an API URL under the **DELETE** *HTTP* method
:param uri: URL to be tagged to **DELETE** method of *HTTP*
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param version: API Version
:param name: Unique name that can be used to identify the Route
:return: Object decorated with :func:`route` method
"""
return self.route(
uri,
methods=frozenset({"DELETE"}),
host=host,
strict_slashes=strict_slashes,
version=version,
name=name,
ignore_body=ignore_body,
)
def websocket(
self,
uri,
host=None,
strict_slashes=None,
version=None,
name=None,
subprotocols=None,
apply: bool = True,
):
"""Create a blueprint websocket route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param host: IP Address of FQDN for the sanic server to use.
:param strict_slashes: Enforce the API urls are requested with a
training */*
:param version: Blueprint Version
:param name: Unique name to identify the Websocket Route
"""
return self.route(
uri=uri,
host=host,
methods=None,
strict_slashes=strict_slashes,
version=version,
name=name,
apply=apply,
subprotocols=subprotocols,
websocket=True,
)
def add_websocket_route(
self,
handler,
uri,
host=None,
strict_slashes=None,
subprotocols=None,
version=None,
name=None,
):
"""
A helper method to register a function as a websocket route.
:param handler: a callable function or instance of a class
that can handle the websocket request
:param host: Host IP or FQDN details
:param uri: URL path that will be mapped to the websocket
handler
handler
:param strict_slashes: If the API endpoint needs to terminate
with a "/" or not
:param subprotocols: Subprotocols to be used with websocket
handshake
:param name: A unique name assigned to the URL so that it can
be used with :func:`url_for`
:return: Objected decorated by :func:`websocket`
"""
return self.websocket(
uri=uri,
host=host,
strict_slashes=strict_slashes,
subprotocols=subprotocols,
version=version,
name=name,
)(handler)
def static(
self,
uri,
file_or_directory: Union[str, bytes, PurePath],
pattern=r"/?.+",
use_modified_since=True,
use_content_range=False,
stream_large_files=False,
name="static",
host=None,
strict_slashes=None,
content_type=None,
apply=True,
):
"""
Register a root to serve files from. The input can either be a
file or a directory. This method will enable an easy and simple way
to setup the :class:`Route` necessary to serve the static files.
:param uri: URL path to be used for serving static content
:param file_or_directory: Path for the Static file/directory with
static files
:param pattern: Regex Pattern identifying the valid static files
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the
:func:`StreamingHTTPResponse.file_stream` handler rather
than the :func:`HTTPResponse.file` handler to send the file.
If this is an integer, this represents the threshold size to
switch to :func:`StreamingHTTPResponse.file_stream`
:param name: user defined name used for url_for
:param host: Host IP or FQDN for the service to use
:param strict_slashes: Instruct :class:`Sanic` to check if the request
URLs need to terminate with a */*
:param content_type: user defined content type for header
:return: routes registered on the router
:rtype: List[sanic.router.Route]
"""
name = self._generate_name(name)
if strict_slashes is None and self.strict_slashes is not None:
strict_slashes = self.strict_slashes
if not isinstance(file_or_directory, (str, bytes, PurePath)):
raise ValueError(
f"Static route must be a valid path, not {file_or_directory}"
)
static = FutureStatic(
uri,
file_or_directory,
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name,
host,
strict_slashes,
content_type,
)
self._future_statics.add(static)
if apply:
self._apply_static(static)
def _generate_name(self, *objects) -> str:
name = None
for obj in objects:
if obj:
if isinstance(obj, str):
name = obj
break
try:
name = obj.name
except AttributeError:
try:
name = obj.__name__
except AttributeError:
continue
else:
break
if not name: # noq
raise ValueError("Could not generate a name for handler")
if not name.startswith(f"{self.name}."):
name = f"{self.name}.{name}"
return name
async def _static_request_handler(
self,
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
request,
content_type=None,
file_uri=None,
):
# Using this to determine if the URL is trying to break out of the path
# served. os.path.realpath seems to be very slow
if file_uri and "../" in file_uri:
raise InvalidUsage("Invalid URL")
# Merge served directory and requested file if provided
# Strip all / that in the beginning of the URL to help prevent python
# from herping a derp and treating the uri as an absolute path
root_path = file_path = file_or_directory
if file_uri:
file_path = path.join(
file_or_directory, sub("^[/]*", "", file_uri)
)
# URL decode the path sent by the browser otherwise we won't be able to
# match filenames which got encoded (filenames with spaces etc)
file_path = path.abspath(unquote(file_path))
if not file_path.startswith(path.abspath(unquote(root_path))):
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={file_uri}"
)
raise FileNotFound(
"File not found", path=file_or_directory, relative_url=file_uri
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = strftime(
"%a, %d %b %Y %H:%M:%S GMT", gmtime(stats.st_mtime)
)
if request.headers.get("If-Modified-Since") == modified_since:
return HTTPResponse(status=304)
headers["Last-Modified"] = modified_since
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
for key, value in _range.headers.items():
headers[key] = value
if "content-type" not in headers:
content_type = (
content_type
or guess_type(file_path)[0]
or "application/octet-stream"
)
if "charset=" not in content_type and (
content_type.startswith("text/")
or content_type == "application/javascript"
):
content_type += "; charset=utf-8"
headers["Content-Type"] = content_type
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if type(stream_large_files) == int:
threshold = stream_large_files
else:
threshold = 1024 * 1024
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except ContentRangeError:
raise
except Exception:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={file_uri}"
)
raise FileNotFound(
"File not found", path=file_or_directory, relative_url=file_uri
)
def _register_static(
self,
static: FutureStatic,
):
# TODO: Though sanic is not a file server, I feel like we should
# at least make a good effort here. Modified-since is nice, but
# we could also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
:param app: Sanic
:param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the
server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the file_stream() handler
rather than the file() handler to send the file
If this is an integer, this represents the
threshold size to switch to file_stream()
:param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header
:return: registered static routes
:rtype: List[sanic.router.Route]
"""
if isinstance(static.file_or_directory, bytes):
file_or_directory = static.file_or_directory.decode("utf-8")
elif isinstance(static.file_or_directory, PurePath):
file_or_directory = str(static.file_or_directory)
elif not isinstance(static.file_or_directory, str):
raise ValueError("Invalid file path string.")
else:
file_or_directory = static.file_or_directory
uri = static.uri
name = static.name
# If we're not trying to match a file directly,
# serve from the folder
if not path.isfile(file_or_directory):
uri += "/<file_uri>"
# special prefix for static files
# if not static.name.startswith("_static_"):
# name = f"_static_{static.name}"
_handler = wraps(self._static_request_handler)(
partial(
self._static_request_handler,
file_or_directory,
static.use_modified_since,
static.use_content_range,
static.stream_large_files,
content_type=static.content_type,
)
)
route, _ = self.route(
uri=uri,
methods=["GET", "HEAD"],
name=name,
host=static.host,
strict_slashes=static.strict_slashes,
static=True,
)(_handler)
return route

0
sanic/models/__init__.py Normal file
View File

39
sanic/models/futures.py Normal file
View File

@ -0,0 +1,39 @@
from collections import namedtuple
FutureRoute = namedtuple(
"FutureRoute",
[
"handler",
"uri",
"methods",
"host",
"strict_slashes",
"stream",
"version",
"name",
"ignore_body",
"websocket",
"subprotocols",
"unquote",
"static",
],
)
FutureListener = namedtuple("FutureListener", ["listener", "event"])
FutureMiddleware = namedtuple("FutureMiddleware", ["middleware", "attach_to"])
FutureException = namedtuple("FutureException", ["handler", "exceptions"])
FutureStatic = namedtuple(
"FutureStatic",
[
"uri",
"file_or_directory",
"pattern",
"use_modified_since",
"use_content_range",
"stream_large_files",
"name",
"host",
"strict_slashes",
"content_type",
],
)

View File

@ -58,6 +58,7 @@ class Request:
"_port",
"_remote_addr",
"_socket",
"_match_info",
"app",
"body",
"conn_info",
@ -106,6 +107,7 @@ class Request:
self.uri_template = None
self.request_middleware_started = False
self._cookies = None
self._match_info = {}
self.stream = None
self.endpoint = None
@ -370,7 +372,7 @@ class Request:
@property
def match_info(self):
"""return matched info after resolving route"""
return self.app.router.get(self)[2]
return self._match_info
# Transport properties (obtained from local interface only)

View File

@ -1,494 +1,156 @@
import re
import uuid
from collections import defaultdict, namedtuple
from collections.abc import Iterable
from functools import lru_cache
from urllib.parse import unquote
from typing import Iterable, List, Optional, Union
from sanic.exceptions import MethodNotSupported, NotFound
from sanic.views import CompositionView
Route = namedtuple(
"Route",
[
"handler",
"methods",
"pattern",
"parameters",
"name",
"uri",
"endpoint",
"ignore_body",
],
from sanic_routing import BaseRouter # type: ignore
from sanic_routing.exceptions import NoMethod # type: ignore
from sanic_routing.exceptions import (
NotFound as RoutingNotFound, # type: ignore
)
Parameter = namedtuple("Parameter", ["name", "cast"])
from sanic_routing.route import Route # type: ignore
from sanic.constants import HTTP_METHODS
from sanic.exceptions import MethodNotSupported, NotFound
from sanic.request import Request
REGEX_TYPES = {
"string": (str, r"[^/]+"),
"int": (int, r"-?\d+"),
"number": (float, r"-?(?:\d+(?:\.\d*)?|\.\d+)"),
"alpha": (str, r"[A-Za-z]+"),
"path": (str, r"[^/].*?"),
"uuid": (
uuid.UUID,
r"[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-"
r"[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}",
),
}
ROUTER_CACHE_SIZE = 1024
def url_hash(url):
return url.count("/")
class RouteExists(Exception):
pass
class RouteDoesNotExist(Exception):
pass
class ParameterNameConflicts(Exception):
pass
class Router:
"""Router supports basic routing with parameters and method checks
Usage:
.. code-block:: python
@sanic.route('/my/url/<my_param>', methods=['GET', 'POST', ...])
def my_route(request, my_param):
do stuff...
or
.. code-block:: python
@sanic.route('/my/url/<my_param:my_type>', methods['GET', 'POST', ...])
def my_route_with_type(request, my_param: my_type):
do stuff...
Parameters will be passed as keyword arguments to the request handling
function. Provided parameters can also have a type by appending :type to
the <parameter>. Given parameter must be able to be type-casted to this.
If no type is provided, a string is expected. A regular expression can
also be passed in as the type. The argument given to the function will
always be a string, independent of the type.
class Router(BaseRouter):
"""
The router implementation responsible for routing a :class:`Request` object
to the appropriate handler.
"""
routes_static = None
routes_dynamic = None
routes_always_check = None
parameter_pattern = re.compile(r"<(.+?)>")
DEFAULT_METHOD = "GET"
ALLOWED_METHODS = HTTP_METHODS
def __init__(self, app):
self.app = app
self.routes_all = {}
self.routes_names = {}
self.routes_static_files = {}
self.routes_static = {}
self.routes_dynamic = defaultdict(list)
self.routes_always_check = []
self.hosts = set()
@classmethod
def parse_parameter_string(cls, parameter_string):
"""Parse a parameter string into its constituent name, type, and
pattern
For example::
parse_parameter_string('<param_one:[A-z]>')` ->
('param_one', str, '[A-z]')
:param parameter_string: String to parse
:return: tuple containing
(parameter_name, parameter_type, parameter_pattern)
"""
# We could receive NAME or NAME:PATTERN
name = parameter_string
pattern = "string"
if ":" in parameter_string:
name, pattern = parameter_string.split(":", 1)
if not name:
raise ValueError(
f"Invalid parameter syntax: {parameter_string}"
# Putting the lru_cache on Router.get() performs better for the benchmarsk
# at tests/benchmark/test_route_resolution_benchmark.py
# However, overall application performance is significantly improved
# with the lru_cache on this method.
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def _get(self, path, method, host):
try:
route, handler, params = self.resolve(
path=path,
method=method,
extra={"host": host},
)
except RoutingNotFound as e:
raise NotFound("Requested URL {} not found".format(e.path))
except NoMethod as e:
raise MethodNotSupported(
"Method {} not allowed for URL {}".format(method, path),
method=method,
allowed_methods=e.allowed_methods,
)
default = (str, pattern)
# Pull from pre-configured types
_type, pattern = REGEX_TYPES.get(pattern, default)
return (
handler,
params,
route.path,
route.name,
route.ctx.ignore_body,
)
return name, _type, pattern
def get(self, request: Request):
"""
Retrieve a `Route` object containg the details about how to handle
a response for a given request
:param request: the incoming request object
:type request: Request
:return: details needed for handling the request and returning the
correct response
:rtype: Tuple[ RouteHandler, Tuple[Any, ...], Dict[str, Any], str, str,
Optional[str], bool, ]
"""
return self._get(
request.path, request.method, request.headers.get("host")
)
def add(
self,
uri,
methods,
uri: str,
methods: Iterable[str],
handler,
host=None,
strict_slashes=False,
ignore_body=False,
version=None,
name=None,
):
"""Add a handler to the route list
:param uri: path to match
:param methods: sequence of accepted method names. If none are
provided, any method is allowed
:param handler: request handler function.
When executed, it should provide a response object.
:param strict_slashes: strict to trailing slash
:param ignore_body: Handler should not read the body, if any
:param version: current version of the route or blueprint. See
docs for further details.
:return: Nothing
host: Optional[Union[str, Iterable[str]]] = None,
strict_slashes: bool = False,
stream: bool = False,
ignore_body: bool = False,
version: Union[str, float, int] = None,
name: Optional[str] = None,
unquote: bool = False,
static: bool = False,
) -> Union[Route, List[Route]]:
"""
Add a handler to the router
:param uri: the path of the route
:type uri: str
:param methods: the types of HTTP methods that should be attached,
example: ``["GET", "POST", "OPTIONS"]``
:type methods: Iterable[str]
:param handler: the sync or async function to be executed
:type handler: RouteHandler
:param host: host that the route should be on, defaults to None
:type host: Optional[str], optional
:param strict_slashes: whether to apply strict slashes, defaults
to False
:type strict_slashes: bool, optional
:param stream: whether to stream the response, defaults to False
:type stream: bool, optional
:param ignore_body: whether the incoming request body should be read,
defaults to False
:type ignore_body: bool, optional
:param version: a version modifier for the uri, defaults to None
:type version: Union[str, float, int], optional
:param name: an identifying name of the route, defaults to None
:type name: Optional[str], optional
:return: the route object
:rtype: Route
"""
routes = []
if version is not None:
version = re.escape(str(version).strip("/").lstrip("v"))
version = str(version).strip("/").lstrip("v")
uri = "/".join([f"/v{version}", uri.lstrip("/")])
# add regular version
routes.append(
self._add(uri, methods, handler, host, name, ignore_body)
)
if strict_slashes:
return routes
if not isinstance(host, str) and host is not None:
# we have gotten back to the top of the recursion tree where the
# host was originally a list. By now, we've processed the strict
# slashes logic on the leaf nodes (the individual host strings in
# the list of host)
return routes
# Add versions with and without trailing /
slashed_methods = self.routes_all.get(uri + "/", frozenset({}))
unslashed_methods = self.routes_all.get(uri[:-1], frozenset({}))
if isinstance(methods, Iterable):
_slash_is_missing = all(
method in slashed_methods for method in methods
)
_without_slash_is_missing = all(
method in unslashed_methods for method in methods
)
else:
_slash_is_missing = methods in slashed_methods
_without_slash_is_missing = methods in unslashed_methods
slash_is_missing = not uri[-1] == "/" and not _slash_is_missing
without_slash_is_missing = (
uri[-1] == "/" and not _without_slash_is_missing and not uri == "/"
)
# add version with trailing slash
if slash_is_missing:
routes.append(
self._add(uri + "/", methods, handler, host, name, ignore_body)
)
# add version without trailing slash
elif without_slash_is_missing:
routes.append(
self._add(uri[:-1], methods, handler, host, name, ignore_body)
)
return routes
def _add(
self, uri, methods, handler, host=None, name=None, ignore_body=False
):
"""Add a handler to the route list
:param uri: path to match
:param methods: sequence of accepted method names. If none are
provided, any method is allowed
:param handler: request handler function.
When executed, it should provide a response object.
:param name: user defined route name for url_for
:return: Nothing
"""
if host is not None:
if isinstance(host, str):
uri = host + uri
self.hosts.add(host)
else:
if not isinstance(host, Iterable):
raise ValueError(
f"Expected either string or Iterable of "
f"host strings, not {host!r}"
)
for host_ in host:
self.add(uri, methods, handler, host_, name)
return
# Dict for faster lookups of if method allowed
if methods:
methods = frozenset(methods)
parameters = []
parameter_names = set()
properties = {"unhashable": None}
def add_parameter(match):
name = match.group(1)
name, _type, pattern = self.parse_parameter_string(name)
if name in parameter_names:
raise ParameterNameConflicts(
f"Multiple parameter named <{name}> " f"in route uri {uri}"
)
parameter_names.add(name)
parameter = Parameter(name=name, cast=_type)
parameters.append(parameter)
# Mark the whole route as unhashable if it has the hash key in it
if re.search(r"(^|[^^]){1}/", pattern):
properties["unhashable"] = True
# Mark the route as unhashable if it matches the hash key
elif re.search(r"/", pattern):
properties["unhashable"] = True
return f"({pattern})"
pattern_string = re.sub(self.parameter_pattern, add_parameter, uri)
pattern = re.compile(fr"^{pattern_string}$")
def merge_route(route, methods, handler):
# merge to the existing route when possible.
if not route.methods or not methods:
# method-unspecified routes are not mergeable.
raise RouteExists(f"Route already registered: {uri}")
elif route.methods.intersection(methods):
# already existing method is not overloadable.
duplicated = methods.intersection(route.methods)
duplicated_methods = ",".join(list(duplicated))
raise RouteExists(
f"Route already registered: {uri} [{duplicated_methods}]"
)
if isinstance(route.handler, CompositionView):
view = route.handler
else:
view = CompositionView()
view.add(route.methods, route.handler)
view.add(methods, handler)
route = route._replace(
handler=view, methods=methods.union(route.methods)
)
return route
if parameters:
# TODO: This is too complex, we need to reduce the complexity
if properties["unhashable"]:
routes_to_check = self.routes_always_check
ndx, route = self.check_dynamic_route_exists(
pattern, routes_to_check, parameters
)
else:
routes_to_check = self.routes_dynamic[url_hash(uri)]
ndx, route = self.check_dynamic_route_exists(
pattern, routes_to_check, parameters
)
if ndx != -1:
# Pop the ndx of the route, no dups of the same route
routes_to_check.pop(ndx)
else:
route = self.routes_all.get(uri)
# prefix the handler name with the blueprint name
# if available
# special prefix for static files
is_static = False
if name and name.startswith("_static_"):
is_static = True
name = name.split("_static_", 1)[-1]
if hasattr(handler, "__blueprintname__"):
bp_name = handler.__blueprintname__
handler_name = f"{bp_name}.{name or handler.__name__}"
else:
handler_name = name or getattr(
handler, "__name__", handler.__class__.__name__
)
if route:
route = merge_route(route, methods, handler)
else:
endpoint = self.app._build_endpoint_name(handler_name)
route = Route(
params = dict(
path=uri,
handler=handler,
methods=methods,
pattern=pattern,
parameters=parameters,
name=handler_name,
uri=uri,
endpoint=endpoint,
ignore_body=ignore_body,
name=name,
strict=strict_slashes,
unquote=unquote,
)
self.routes_all[uri] = route
if is_static:
pair = self.routes_static_files.get(handler_name)
if not (pair and (pair[0] + "/" == uri or uri + "/" == pair[0])):
self.routes_static_files[handler_name] = (uri, route)
if isinstance(host, str):
hosts = [host]
else:
pair = self.routes_names.get(handler_name)
if not (pair and (pair[0] + "/" == uri or uri + "/" == pair[0])):
self.routes_names[handler_name] = (uri, route)
hosts = host or [None] # type: ignore
if properties["unhashable"]:
self.routes_always_check.append(route)
elif parameters:
self.routes_dynamic[url_hash(uri)].append(route)
else:
self.routes_static[uri] = route
return route
routes = []
@staticmethod
def check_dynamic_route_exists(pattern, routes_to_check, parameters):
for host in hosts:
if host:
params.update({"requirements": {"host": host}})
route = super().add(**params)
route.ctx.ignore_body = ignore_body
route.ctx.stream = stream
route.ctx.hosts = hosts
route.ctx.static = static
routes.append(route)
if len(routes) == 1:
return routes[0]
return routes
def is_stream_handler(self, request) -> bool:
"""
Check if a URL pattern exists in a list of routes provided based on
the comparison of URL pattern and the parameters.
Handler for request is stream or not.
:param pattern: URL parameter pattern
:param routes_to_check: list of dynamic routes either hashable or
unhashable routes.
:param parameters: List of :class:`Parameter` items
:return: Tuple of index and route if matching route exists else
-1 for index and None for route
"""
for ndx, route in enumerate(routes_to_check):
if route.pattern == pattern and route.parameters == parameters:
return ndx, route
else:
return -1, None
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def find_route_by_view_name(self, view_name, name=None):
"""Find a route in the router based on the specified view name.
:param view_name: string of view name to search by
:param kwargs: additional params, usually for static files
:return: tuple containing (uri, Route)
"""
if not view_name:
return (None, None)
if view_name == "static" or view_name.endswith(".static"):
return self.routes_static_files.get(name, (None, None))
return self.routes_names.get(view_name, (None, None))
def get(self, request):
"""Get a request handler based on the URL of the request, or raises an
error
:param request: Request object
:return: handler, arguments, keyword arguments
"""
# No virtual hosts specified; default behavior
if not self.hosts:
return self._get(request.path, request.method, "")
# virtual hosts specified; try to match route to the host header
try:
return self._get(
request.path, request.method, request.headers.get("Host", "")
)
# try default hosts
except NotFound:
return self._get(request.path, request.method, "")
def get_supported_methods(self, url):
"""Get a list of supported methods for a url and optional host.
:param url: URL string (including host)
:return: frozenset of supported methods
"""
route = self.routes_all.get(url)
# if methods are None then this logic will prevent an error
return getattr(route, "methods", None) or frozenset()
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def _get(self, url, method, host):
"""Get a request handler based on the URL of the request, or raises an
error. Internal method for caching.
:param url: request URL
:param method: request method
:return: handler, arguments, keyword arguments
"""
url = unquote(host + url)
# Check against known static routes
route = self.routes_static.get(url)
method_not_supported = MethodNotSupported(
f"Method {method} not allowed for URL {url}",
method=method,
allowed_methods=self.get_supported_methods(url),
)
if route:
if route.methods and method not in route.methods:
raise method_not_supported
match = route.pattern.match(url)
else:
route_found = False
# Move on to testing all regex routes
for route in self.routes_dynamic[url_hash(url)]:
match = route.pattern.match(url)
route_found |= match is not None
# Do early method checking
if match and method in route.methods:
break
else:
# Lastly, check against all regex routes that cannot be hashed
for route in self.routes_always_check:
match = route.pattern.match(url)
route_found |= match is not None
# Do early method checking
if match and method in route.methods:
break
else:
# Route was found but the methods didn't match
if route_found:
raise method_not_supported
raise NotFound(f"Requested URL {url} not found")
kwargs = {
p.name: p.cast(value)
for value, p in zip(match.groups(1), route.parameters)
}
route_handler = route.handler
if hasattr(route_handler, "handlers"):
route_handler = route_handler.handlers[method]
return (
route_handler,
[],
kwargs,
route.uri,
route.name,
route.endpoint,
route.ignore_body,
)
def is_stream_handler(self, request):
"""Handler for request is stream or not.
:param request: Request object
:return: bool
"""
@ -501,3 +163,41 @@ class Router:
):
handler = getattr(handler.view_class, request.method.lower())
return hasattr(handler, "is_stream")
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def find_route_by_view_name(self, view_name, name=None):
"""
Find a route in the router based on the specified view name.
:param view_name: string of view name to search by
:param kwargs: additional params, usually for static files
:return: tuple containing (uri, Route)
"""
if not view_name:
return None
route = self.name_index.get(view_name)
if not route:
full_name = self.ctx.app._generate_name(view_name)
route = self.name_index.get(full_name)
if not route:
return None
return route
@property
def routes_all(self):
return self.routes
@property
def routes_static(self):
return self.static_routes
@property
def routes_dynamic(self):
return self.dynamic_routes
@property
def routes_regex(self):
return self.regex_routes

View File

@ -1,189 +0,0 @@
from functools import partial, wraps
from mimetypes import guess_type
from os import path
from pathlib import PurePath
from re import sub
from time import gmtime, strftime
from typing import Union
from urllib.parse import unquote
from sanic.compat import stat_async
from sanic.exceptions import (
ContentRangeError,
FileNotFound,
HeaderNotFound,
InvalidUsage,
)
from sanic.handlers import ContentRangeHandler
from sanic.log import error_logger
from sanic.response import HTTPResponse, file, file_stream
async def _static_request_handler(
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
request,
content_type=None,
file_uri=None,
):
# Using this to determine if the URL is trying to break out of the path
# served. os.path.realpath seems to be very slow
if file_uri and "../" in file_uri:
raise InvalidUsage("Invalid URL")
# Merge served directory and requested file if provided
# Strip all / that in the beginning of the URL to help prevent python
# from herping a derp and treating the uri as an absolute path
root_path = file_path = file_or_directory
if file_uri:
file_path = path.join(file_or_directory, sub("^[/]*", "", file_uri))
# URL decode the path sent by the browser otherwise we won't be able to
# match filenames which got encoded (filenames with spaces etc)
file_path = path.abspath(unquote(file_path))
if not file_path.startswith(path.abspath(unquote(root_path))):
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={file_uri}"
)
raise FileNotFound(
"File not found", path=file_or_directory, relative_url=file_uri
)
try:
headers = {}
# Check if the client has been sent this file before
# and it has not been modified since
stats = None
if use_modified_since:
stats = await stat_async(file_path)
modified_since = strftime(
"%a, %d %b %Y %H:%M:%S GMT", gmtime(stats.st_mtime)
)
if request.headers.get("If-Modified-Since") == modified_since:
return HTTPResponse(status=304)
headers["Last-Modified"] = modified_since
_range = None
if use_content_range:
_range = None
if not stats:
stats = await stat_async(file_path)
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method != "HEAD":
try:
_range = ContentRangeHandler(request, stats)
except HeaderNotFound:
pass
else:
del headers["Content-Length"]
for key, value in _range.headers.items():
headers[key] = value
headers["Content-Type"] = (
content_type or guess_type(file_path)[0] or "text/plain"
)
if request.method == "HEAD":
return HTTPResponse(headers=headers)
else:
if stream_large_files:
if type(stream_large_files) == int:
threshold = stream_large_files
else:
threshold = 1024 * 1024
if not stats:
stats = await stat_async(file_path)
if stats.st_size >= threshold:
return await file_stream(
file_path, headers=headers, _range=_range
)
return await file(file_path, headers=headers, _range=_range)
except ContentRangeError:
raise
except Exception:
error_logger.exception(
f"File not found: path={file_or_directory}, "
f"relative_url={file_uri}"
)
raise FileNotFound(
"File not found", path=file_or_directory, relative_url=file_uri
)
def register(
app,
uri: str,
file_or_directory: Union[str, bytes, PurePath],
pattern,
use_modified_since,
use_content_range,
stream_large_files,
name: str = "static",
host=None,
strict_slashes=None,
content_type=None,
):
# TODO: Though sanic is not a file server, I feel like we should at least
# make a good effort here. Modified-since is nice, but we could
# also look into etags, expires, and caching
"""
Register a static directory handler with Sanic by adding a route to the
router and registering a handler.
:param app: Sanic
:param file_or_directory: File or directory path to serve from
:type file_or_directory: Union[str,bytes,Path]
:param uri: URL to serve from
:type uri: str
:param pattern: regular expression used to match files in the URL
:param use_modified_since: If true, send file modified time, and return
not modified if the browser's matches the
server's
:param use_content_range: If true, process header for range requests
and sends the file part that is requested
:param stream_large_files: If true, use the file_stream() handler rather
than the file() handler to send the file
If this is an integer, this represents the
threshold size to switch to file_stream()
:param name: user defined name used for url_for
:type name: str
:param content_type: user defined content type for header
:return: registered static routes
:rtype: List[sanic.router.Route]
"""
if isinstance(file_or_directory, bytes):
file_or_directory = file_or_directory.decode("utf-8")
elif isinstance(file_or_directory, PurePath):
file_or_directory = str(file_or_directory)
elif not isinstance(file_or_directory, str):
raise ValueError("Invalid file path string.")
# If we're not trying to match a file directly,
# serve from the folder
if not path.isfile(file_or_directory):
uri += "<file_uri:" + pattern + ">"
# special prefix for static files
if not name.startswith("_static_"):
name = f"_static_{name}"
_handler = wraps(_static_request_handler)(
partial(
_static_request_handler,
file_or_directory,
use_modified_since,
use_content_range,
stream_large_files,
content_type=content_type,
)
)
_routes, _ = app.route(
uri,
methods=["GET", "HEAD"],
name=name,
host=host,
strict_slashes=strict_slashes,
)(_handler)
return _routes

View File

@ -44,7 +44,7 @@ def str_to_bool(val: str) -> bool:
def load_module_from_file_location(
location: Union[bytes, str, Path], encoding: str = "utf8", *args, **kwargs
):
): # noqa
"""Returns loaded module provided as a file path.
:param args:

View File

@ -92,6 +92,9 @@ class CompositionView:
self.handlers = {}
self.name = self.__class__.__name__
def __name__(self):
return self.name
def add(self, methods, handler, stream=False):
if stream:
handler.is_stream = stream

View File

@ -84,6 +84,7 @@ ujson = "ujson>=1.35" + env_dependency
uvloop = "uvloop>=0.5.3,<0.15.0" + env_dependency
requirements = [
"sanic-routing",
"httptools>=0.0.10",
uvloop,
ujson,

View File

@ -4,6 +4,8 @@ from pytest import mark
import sanic.router
from sanic.request import Request
seed("Pack my box with five dozen liquor jugs.")
@ -23,8 +25,17 @@ class TestSanicRouteResolution:
route_to_call = choice(simple_routes)
result = benchmark.pedantic(
router._get,
("/{}".format(route_to_call[-1]), route_to_call[0], "localhost"),
router.get,
(
Request(
"/{}".format(route_to_call[-1]).encode(),
{"host": "localhost"},
"v1",
route_to_call[0],
None,
None,
),
),
iterations=1000,
rounds=1000,
)
@ -47,8 +58,17 @@ class TestSanicRouteResolution:
print("{} -> {}".format(route_to_call[-1], url))
result = benchmark.pedantic(
router._get,
("/{}".format(url), route_to_call[0], "localhost"),
router.get,
(
Request(
"/{}".format(url).encode(),
{"host": "localhost"},
"v1",
route_to_call[0],
None,
None,
),
),
iterations=1000,
rounds=1000,
)

View File

@ -4,12 +4,16 @@ import string
import sys
import uuid
from typing import Tuple
import pytest
from sanic_routing.exceptions import RouteExists
from sanic_testing import TestManager
from sanic import Sanic
from sanic.router import RouteExists, Router
from sanic.constants import HTTP_METHODS
from sanic.router import Router
random.seed("Pack my box with five dozen liquor jugs.")
@ -38,12 +42,12 @@ async def _handler(request):
TYPE_TO_GENERATOR_MAP = {
"string": lambda: "".join(
[random.choice(string.ascii_letters + string.digits) for _ in range(4)]
[random.choice(string.ascii_lowercase) for _ in range(4)]
),
"int": lambda: random.choice(range(1000000)),
"number": lambda: random.random(),
"alpha": lambda: "".join(
[random.choice(string.ascii_letters) for _ in range(4)]
[random.choice(string.ascii_lowercase) for _ in range(4)]
),
"uuid": lambda: str(uuid.uuid1()),
}
@ -52,7 +56,7 @@ TYPE_TO_GENERATOR_MAP = {
class RouteStringGenerator:
ROUTE_COUNT_PER_DEPTH = 100
HTTP_METHODS = ["GET", "PUT", "POST", "PATCH", "DELETE", "OPTION"]
HTTP_METHODS = HTTP_METHODS
ROUTE_PARAM_TYPES = ["string", "int", "number", "alpha", "uuid"]
def generate_random_direct_route(self, max_route_depth=4):
@ -105,12 +109,12 @@ class RouteStringGenerator:
@pytest.fixture(scope="function")
def sanic_router(app):
# noinspection PyProtectedMember
def _setup(route_details: tuple) -> (Router, tuple):
router = Router(app)
def _setup(route_details: tuple) -> Tuple[Router, tuple]:
router = Router()
added_router = []
for method, route in route_details:
try:
router._add(
router.add(
uri=f"/{route}",
methods=frozenset({method}),
host="localhost",
@ -119,6 +123,7 @@ def sanic_router(app):
added_router.append((method, route))
except RouteExists:
pass
router.finalize()
return router, added_router
return _setup
@ -137,5 +142,4 @@ def url_param_generator():
@pytest.fixture(scope="function")
def app(request):
app = Sanic(request.node.name)
# TestManager(app)
return app

View File

@ -118,7 +118,7 @@ def test_app_route_raise_value_error(app):
def test_app_handle_request_handler_is_none(app, monkeypatch):
def mockreturn(*args, **kwargs):
return None, [], {}, "", "", None, False
return None, {}, "", "", False
# Not sure how to make app.router.get() return None, so use mock here.
monkeypatch.setattr(app.router, "get", mockreturn)

View File

@ -45,7 +45,8 @@ def protocol(transport):
return transport.get_protocol()
def test_listeners_triggered(app):
def test_listeners_triggered():
app = Sanic("app")
before_server_start = False
after_server_start = False
before_server_stop = False
@ -71,6 +72,10 @@ def test_listeners_triggered(app):
nonlocal after_server_stop
after_server_stop = True
@app.route("/")
def handler(request):
return text("...")
class CustomServer(uvicorn.Server):
def install_signal_handlers(self):
pass
@ -121,6 +126,10 @@ def test_listeners_triggered_async(app):
nonlocal after_server_stop
after_server_stop = True
@app.route("/")
def handler(request):
return text("...")
class CustomServer(uvicorn.Server):
def install_signal_handlers(self):
pass
@ -325,7 +334,7 @@ async def test_cookie_customization(app):
@pytest.mark.asyncio
async def test_json_content_type(app):
async def test_content_type(app):
@app.get("/json")
def send_json(request):
return json({"foo": "bar"})

View File

@ -4,6 +4,8 @@ import asyncio
def test_bad_request_response(app):
lines = []
app.get("/")(lambda x: ...)
@app.listener("after_server_start")
async def _request(sanic, loop):
connect = asyncio.open_connection("127.0.0.1", 42101)

View File

@ -110,6 +110,11 @@ def test_bp_group(app: Sanic):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["request"] += 1
@blueprint_group_1.middleware
def blueprint_group_1_middleware_not_called(request):
global MIDDLEWARE_INVOKE_COUNTER
MIDDLEWARE_INVOKE_COUNTER["request"] += 1
@blueprint_3.route("/")
def blueprint_3_default_route(request):
return text("BP3_OK")
@ -142,7 +147,7 @@ def test_bp_group(app: Sanic):
assert response.text == "BP3_OK"
assert MIDDLEWARE_INVOKE_COUNTER["response"] == 3
assert MIDDLEWARE_INVOKE_COUNTER["request"] == 2
assert MIDDLEWARE_INVOKE_COUNTER["request"] == 4
def test_bp_group_list_operations(app: Sanic):
@ -179,3 +184,19 @@ def test_bp_group_list_operations(app: Sanic):
assert len(blueprint_group_1) == 2
assert blueprint_group_1.url_prefix == "/bp"
def test_bp_group_as_list():
blueprint_1 = Blueprint("blueprint_1", url_prefix="/bp1")
blueprint_2 = Blueprint("blueprint_2", url_prefix="/bp2")
blueprint_group_1 = Blueprint.group([blueprint_1, blueprint_2])
assert len(blueprint_group_1) == 2
def test_bp_group_as_nested_group():
blueprint_1 = Blueprint("blueprint_1", url_prefix="/bp1")
blueprint_2 = Blueprint("blueprint_2", url_prefix="/bp2")
blueprint_group_1 = Blueprint.group(
Blueprint.group(blueprint_1, blueprint_2)
)
assert len(blueprint_group_1) == 2

View File

@ -209,18 +209,28 @@ def test_bp_with_host(app):
app.blueprint(bp)
headers = {"Host": "example.com"}
request, response = app.test_client.get("/test1/", headers=headers)
assert response.text == "Hello"
headers = {"Host": "sub.example.com"}
request, response = app.test_client.get("/test1/", headers=headers)
assert response.text == "Hello subdomain!"
assert response.body == b"Hello subdomain!"
def test_several_bp_with_host(app):
bp = Blueprint("test_text", url_prefix="/test", host="example.com")
bp2 = Blueprint("test_text2", url_prefix="/test", host="sub.example.com")
bp = Blueprint(
"test_text",
url_prefix="/test",
host="example.com",
strict_slashes=True,
)
bp2 = Blueprint(
"test_text2",
url_prefix="/test",
host="sub.example.com",
strict_slashes=True,
)
@bp.route("/")
def handler(request):
@ -240,6 +250,7 @@ def test_several_bp_with_host(app):
assert bp.host == "example.com"
headers = {"Host": "example.com"}
request, response = app.test_client.get("/test/", headers=headers)
assert response.text == "Hello"
assert bp2.host == "sub.example.com"
@ -352,6 +363,29 @@ def test_bp_middleware(app):
assert response.text == "FAIL"
def test_bp_middleware_with_route(app):
blueprint = Blueprint("test_bp_middleware")
@blueprint.middleware("response")
async def process_response(request, response):
return text("OK")
@app.route("/")
async def handler(request):
return text("FAIL")
@blueprint.route("/bp")
async def bp_handler(request):
return text("FAIL")
app.blueprint(blueprint)
request, response = app.test_client.get("/bp")
assert response.status == 200
assert response.text == "OK"
def test_bp_middleware_order(app):
blueprint = Blueprint("test_bp_middleware_order")
order = list()
@ -425,6 +459,7 @@ def test_bp_exception_handler(app):
def test_bp_listeners(app):
app.route("/")(lambda x: x)
blueprint = Blueprint("test_middleware")
order = []
@ -537,19 +572,19 @@ def test_bp_shorthand(app):
app.blueprint(blueprint)
request, response = app.test_client.get("/get")
assert response.text == "OK"
assert response.body == b"OK"
request, response = app.test_client.post("/get")
assert response.status == 405
request, response = app.test_client.put("/put")
assert response.text == "OK"
assert response.body == b"OK"
request, response = app.test_client.get("/post")
assert response.status == 405
request, response = app.test_client.post("/post")
assert response.text == "OK"
assert response.body == b"OK"
request, response = app.test_client.get("/post")
assert response.status == 405
@ -561,19 +596,19 @@ def test_bp_shorthand(app):
assert response.status == 405
request, response = app.test_client.options("/options")
assert response.text == "OK"
assert response.body == b"OK"
request, response = app.test_client.get("/options")
assert response.status == 405
request, response = app.test_client.patch("/patch")
assert response.text == "OK"
assert response.body == b"OK"
request, response = app.test_client.get("/patch")
assert response.status == 405
request, response = app.test_client.delete("/delete")
assert response.text == "OK"
assert response.body == b"OK"
request, response = app.test_client.get("/delete")
assert response.status == 405
@ -699,7 +734,8 @@ def test_blueprint_middleware_with_args(app: Sanic):
@pytest.mark.parametrize("file_name", ["test.file"])
def test_static_blueprint_name(app: Sanic, static_file_directory, file_name):
def test_static_blueprint_name(static_file_directory, file_name):
app = Sanic("app")
current_file = inspect.getfile(inspect.currentframe())
with open(current_file, "rb") as file:
file.read()
@ -814,17 +850,19 @@ def test_duplicate_blueprint(app):
)
def test_strict_slashes_behavior_adoption(app):
def test_strict_slashes_behavior_adoption():
app = Sanic("app")
app.strict_slashes = True
bp = Blueprint("bp")
bp2 = Blueprint("bp2", strict_slashes=False)
@app.get("/test")
def handler_test(request):
return text("Test")
assert app.test_client.get("/test")[1].status == 200
assert app.test_client.get("/test/")[1].status == 404
bp = Blueprint("bp")
@app.get("/f1", strict_slashes=False)
def f1(request):
return text("f1")
@bp.get("/one", strict_slashes=False)
def one(request):
@ -834,7 +872,15 @@ def test_strict_slashes_behavior_adoption(app):
def second(request):
return text("second")
@bp2.get("/third")
def third(request):
return text("third")
app.blueprint(bp)
app.blueprint(bp2)
assert app.test_client.get("/test")[1].status == 200
assert app.test_client.get("/test/")[1].status == 404
assert app.test_client.get("/one")[1].status == 200
assert app.test_client.get("/one/")[1].status == 200
@ -842,19 +888,8 @@ def test_strict_slashes_behavior_adoption(app):
assert app.test_client.get("/second")[1].status == 200
assert app.test_client.get("/second/")[1].status == 404
bp2 = Blueprint("bp2", strict_slashes=False)
@bp2.get("/third")
def third(request):
return text("third")
app.blueprint(bp2)
assert app.test_client.get("/third")[1].status == 200
assert app.test_client.get("/third/")[1].status == 200
@app.get("/f1", strict_slashes=False)
def f1(request):
return text("f1")
assert app.test_client.get("/f1")[1].status == 200
assert app.test_client.get("/f1/")[1].status == 200

View File

@ -43,7 +43,7 @@ async def test_cookies_asgi(app):
response_cookies = SimpleCookie()
response_cookies.load(response.headers.get("set-cookie", {}))
assert response.text == "Cookies are: working!"
assert response.body == b"Cookies are: working!"
assert response_cookies["right_back"].value == "at you"

View File

@ -1,44 +1,44 @@
import pytest
# import pytest
from sanic.response import text
from sanic.router import RouteExists
# from sanic.response import text
# from sanic.router import RouteExists
@pytest.mark.parametrize(
"method,attr, expected",
[
("get", "text", "OK1 test"),
("post", "text", "OK2 test"),
("put", "text", "OK2 test"),
("delete", "status", 405),
],
)
def test_overload_dynamic_routes(app, method, attr, expected):
@app.route("/overload/<param>", methods=["GET"])
async def handler1(request, param):
return text("OK1 " + param)
# @pytest.mark.parametrize(
# "method,attr, expected",
# [
# ("get", "text", "OK1 test"),
# ("post", "text", "OK2 test"),
# ("put", "text", "OK2 test"),
# ("delete", "status", 405),
# ],
# )
# def test_overload_dynamic_routes(app, method, attr, expected):
# @app.route("/overload/<param>", methods=["GET"])
# async def handler1(request, param):
# return text("OK1 " + param)
@app.route("/overload/<param>", methods=["POST", "PUT"])
async def handler2(request, param):
return text("OK2 " + param)
# @app.route("/overload/<param>", methods=["POST", "PUT"])
# async def handler2(request, param):
# return text("OK2 " + param)
request, response = getattr(app.test_client, method)("/overload/test")
assert getattr(response, attr) == expected
# request, response = getattr(app.test_client, method)("/overload/test")
# assert getattr(response, attr) == expected
def test_overload_dynamic_routes_exist(app):
@app.route("/overload/<param>", methods=["GET"])
async def handler1(request, param):
return text("OK1 " + param)
# def test_overload_dynamic_routes_exist(app):
# @app.route("/overload/<param>", methods=["GET"])
# async def handler1(request, param):
# return text("OK1 " + param)
@app.route("/overload/<param>", methods=["POST", "PUT"])
async def handler2(request, param):
return text("OK2 " + param)
# @app.route("/overload/<param>", methods=["POST", "PUT"])
# async def handler2(request, param):
# return text("OK2 " + param)
# if this doesn't raise an error, than at least the below should happen:
# assert response.text == 'Duplicated'
with pytest.raises(RouteExists):
# # if this doesn't raise an error, than at least the below should happen:
# # assert response.text == 'Duplicated'
# with pytest.raises(RouteExists):
@app.route("/overload/<param>", methods=["PUT", "DELETE"])
async def handler3(request, param):
return text("Duplicated")
# @app.route("/overload/<param>", methods=["PUT", "DELETE"])
# async def handler3(request, param):
# return text("Duplicated")

View File

@ -126,8 +126,9 @@ def test_html_traceback_output_in_debug_mode():
assert response.status == 500
soup = BeautifulSoup(response.body, "html.parser")
html = str(soup)
print(html)
assert "response = handler(request, *args, **kwargs)" in html
assert "response = handler(request, **kwargs)" in html
assert "handler_4" in html
assert "foo = bar" in html
@ -151,7 +152,7 @@ def test_chained_exception_handler():
soup = BeautifulSoup(response.body, "html.parser")
html = str(soup)
assert "response = handler(request, *args, **kwargs)" in html
assert "response = handler(request, **kwargs)" in html
assert "handler_6" in html
assert "foo = 1 / arg" in html
assert "ValueError" in html

View File

@ -103,7 +103,13 @@ def test_logging_pass_customer_logconfig():
assert fmt._fmt == modified_config["formatters"]["access"]["format"]
@pytest.mark.parametrize("debug", (True, False))
@pytest.mark.parametrize(
"debug",
(
True,
False,
),
)
def test_log_connection_lost(app, debug, monkeypatch):
""" Should not log Connection lost exception on non debug """
stream = StringIO()
@ -117,7 +123,7 @@ def test_log_connection_lost(app, debug, monkeypatch):
request.transport.close()
return response
req, res = app.test_client.get("/conn_lost", debug=debug)
req, res = app.test_client.get("/conn_lost", debug=debug, allow_none=True)
assert res is None
log = stream.getvalue()

View File

@ -30,6 +30,23 @@ def test_middleware_request(app):
assert type(results[0]) is Request
def test_middleware_request_as_convenience(app):
results = []
@app.on_request
async def handler1(request):
results.append(request)
@app.route("/")
async def handler2(request):
return text("OK")
request, response = app.test_client.get("/")
assert response.text == "OK"
assert type(results[0]) is Request
def test_middleware_response(app):
results = []
@ -54,6 +71,54 @@ def test_middleware_response(app):
assert isinstance(results[2], HTTPResponse)
def test_middleware_response_as_convenience(app):
results = []
@app.on_request
async def process_request(request):
results.append(request)
@app.on_response
async def process_response(request, response):
results.append(request)
results.append(response)
@app.route("/")
async def handler(request):
return text("OK")
request, response = app.test_client.get("/")
assert response.text == "OK"
assert type(results[0]) is Request
assert type(results[1]) is Request
assert isinstance(results[2], HTTPResponse)
def test_middleware_response_as_convenience_called(app):
results = []
@app.on_request()
async def process_request(request):
results.append(request)
@app.on_response()
async def process_response(request, response):
results.append(request)
results.append(response)
@app.route("/")
async def handler(request):
return text("OK")
request, response = app.test_client.get("/")
assert response.text == "OK"
assert type(results[0]) is Request
assert type(results[1]) is Request
assert isinstance(results[2], HTTPResponse)
def test_middleware_response_exception(app):
result = {"status_code": "middleware not run"}
@ -102,6 +167,7 @@ def test_middleware_response_raise_exception(app, caplog):
async def process_response(request, response):
raise Exception("Exception at response middleware")
app.route("/")(lambda x: x)
with caplog.at_level(logging.ERROR):
reqrequest, response = app.test_client.get("/fail")
@ -129,7 +195,7 @@ def test_middleware_override_request(app):
async def handler(request):
return text("FAIL")
response = app.test_client.get("/", gather_request=False)
_, response = app.test_client.get("/", gather_request=False)
assert response.status == 200
assert response.text == "OK"

View File

@ -68,9 +68,12 @@ def handler(request):
@pytest.mark.parametrize("protocol", [3, 4])
def test_pickle_app(app, protocol):
app.route("/")(handler)
app.router.finalize()
app.router.reset()
p_app = pickle.dumps(app, protocol=protocol)
del app
up_p_app = pickle.loads(p_app)
up_p_app.router.finalize()
assert up_p_app
request, response = up_p_app.test_client.get("/")
assert response.text == "Hello"
@ -81,9 +84,12 @@ def test_pickle_app_with_bp(app, protocol):
bp = Blueprint("test_text")
bp.route("/")(handler)
app.blueprint(bp)
app.router.finalize()
app.router.reset()
p_app = pickle.dumps(app, protocol=protocol)
del app
up_p_app = pickle.loads(p_app)
up_p_app.router.finalize()
assert up_p_app
request, response = up_p_app.test_client.get("/")
assert response.text == "Hello"
@ -93,9 +99,12 @@ def test_pickle_app_with_bp(app, protocol):
def test_pickle_app_with_static(app, protocol):
app.route("/")(handler)
app.static("/static", "/tmp/static")
app.router.finalize()
app.router.reset()
p_app = pickle.dumps(app, protocol=protocol)
del app
up_p_app = pickle.loads(p_app)
up_p_app.router.finalize()
assert up_p_app
request, response = up_p_app.test_client.get("/static/missing.txt")
assert response.status == 404

View File

@ -5,6 +5,7 @@ import asyncio
import pytest
from sanic import Sanic
from sanic.blueprints import Blueprint
from sanic.constants import HTTP_METHODS
from sanic.exceptions import URLBuildError
@ -17,7 +18,9 @@ from sanic.response import text
@pytest.mark.parametrize("method", HTTP_METHODS)
def test_versioned_named_routes_get(app, method):
def test_versioned_named_routes_get(method):
app = Sanic("app")
bp = Blueprint("test_bp", url_prefix="/bp")
method = method.lower()
@ -32,7 +35,6 @@ def test_versioned_named_routes_get(app, method):
return text("OK")
else:
print(func)
raise
func = getattr(bp, method)
@ -43,15 +45,28 @@ def test_versioned_named_routes_get(app, method):
return text("OK")
else:
print(func)
raise
app.blueprint(bp)
assert app.router.routes_all[f"/v1/{method}"].name == route_name
assert (
app.router.routes_all[
(
"v1",
method,
)
].name
== f"app.{route_name}"
)
route = app.router.routes_all[f"/v1/bp/{method}"]
assert route.name == f"test_bp.{route_name2}"
route = app.router.routes_all[
(
"v1",
"bp",
method,
)
]
assert route.name == f"app.test_bp.{route_name2}"
assert app.url_for(route_name) == f"/v1/{method}"
url = app.url_for(f"test_bp.{route_name2}")
@ -60,16 +75,19 @@ def test_versioned_named_routes_get(app, method):
app.url_for("handler")
def test_shorthand_default_routes_get(app):
def test_shorthand_default_routes_get():
app = Sanic("app")
@app.get("/get")
def handler(request):
return text("OK")
assert app.router.routes_all["/get"].name == "handler"
assert app.router.routes_all[("get",)].name == "app.handler"
assert app.url_for("handler") == "/get"
def test_shorthand_named_routes_get(app):
def test_shorthand_named_routes_get():
app = Sanic("app")
bp = Blueprint("test_bp", url_prefix="/bp")
@app.get("/get", name="route_get")
@ -82,84 +100,106 @@ def test_shorthand_named_routes_get(app):
app.blueprint(bp)
assert app.router.routes_all["/get"].name == "route_get"
assert app.router.routes_all[("get",)].name == "app.route_get"
assert app.url_for("route_get") == "/get"
with pytest.raises(URLBuildError):
app.url_for("handler")
assert app.router.routes_all["/bp/get"].name == "test_bp.route_bp"
assert (
app.router.routes_all[
(
"bp",
"get",
)
].name
== "app.test_bp.route_bp"
)
assert app.url_for("test_bp.route_bp") == "/bp/get"
with pytest.raises(URLBuildError):
app.url_for("test_bp.handler2")
def test_shorthand_named_routes_post(app):
def test_shorthand_named_routes_post():
app = Sanic("app")
@app.post("/post", name="route_name")
def handler(request):
return text("OK")
assert app.router.routes_all["/post"].name == "route_name"
assert app.router.routes_all[("post",)].name == "app.route_name"
assert app.url_for("route_name") == "/post"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_shorthand_named_routes_put(app):
def test_shorthand_named_routes_put():
app = Sanic("app")
@app.put("/put", name="route_put")
def handler(request):
return text("OK")
assert app.router.routes_all["/put"].name == "route_put"
assert app.router.routes_all[("put",)].name == "app.route_put"
assert app.url_for("route_put") == "/put"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_shorthand_named_routes_delete(app):
def test_shorthand_named_routes_delete():
app = Sanic("app")
@app.delete("/delete", name="route_delete")
def handler(request):
return text("OK")
assert app.router.routes_all["/delete"].name == "route_delete"
assert app.router.routes_all[("delete",)].name == "app.route_delete"
assert app.url_for("route_delete") == "/delete"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_shorthand_named_routes_patch(app):
def test_shorthand_named_routes_patch():
app = Sanic("app")
@app.patch("/patch", name="route_patch")
def handler(request):
return text("OK")
assert app.router.routes_all["/patch"].name == "route_patch"
assert app.router.routes_all[("patch",)].name == "app.route_patch"
assert app.url_for("route_patch") == "/patch"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_shorthand_named_routes_head(app):
def test_shorthand_named_routes_head():
app = Sanic("app")
@app.head("/head", name="route_head")
def handler(request):
return text("OK")
assert app.router.routes_all["/head"].name == "route_head"
assert app.router.routes_all[("head",)].name == "app.route_head"
assert app.url_for("route_head") == "/head"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_shorthand_named_routes_options(app):
def test_shorthand_named_routes_options():
app = Sanic("app")
@app.options("/options", name="route_options")
def handler(request):
return text("OK")
assert app.router.routes_all["/options"].name == "route_options"
assert app.router.routes_all[("options",)].name == "app.route_options"
assert app.url_for("route_options") == "/options"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_named_static_routes(app):
def test_named_static_routes():
app = Sanic("app")
@app.route("/test", name="route_test")
async def handler1(request):
return text("OK1")
@ -168,20 +208,21 @@ def test_named_static_routes(app):
async def handler2(request):
return text("OK2")
assert app.router.routes_all["/test"].name == "route_test"
assert app.router.routes_static["/test"].name == "route_test"
assert app.router.routes_all[("test",)].name == "app.route_test"
assert app.router.routes_static[("test",)].name == "app.route_test"
assert app.url_for("route_test") == "/test"
with pytest.raises(URLBuildError):
app.url_for("handler1")
assert app.router.routes_all["/pizazz"].name == "route_pizazz"
assert app.router.routes_static["/pizazz"].name == "route_pizazz"
assert app.router.routes_all[("pizazz",)].name == "app.route_pizazz"
assert app.router.routes_static[("pizazz",)].name == "app.route_pizazz"
assert app.url_for("route_pizazz") == "/pizazz"
with pytest.raises(URLBuildError):
app.url_for("handler2")
def test_named_dynamic_route(app):
def test_named_dynamic_route():
app = Sanic("app")
results = []
@app.route("/folder/<name>", name="route_dynamic")
@ -189,52 +230,83 @@ def test_named_dynamic_route(app):
results.append(name)
return text("OK")
assert app.router.routes_all["/folder/<name>"].name == "route_dynamic"
assert (
app.router.routes_all[
(
"folder",
"<name>",
)
].name
== "app.route_dynamic"
)
assert app.url_for("route_dynamic", name="test") == "/folder/test"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_dynamic_named_route_regex(app):
def test_dynamic_named_route_regex():
app = Sanic("app")
@app.route("/folder/<folder_id:[A-Za-z0-9]{0,4}>", name="route_re")
async def handler(request, folder_id):
return text("OK")
route = app.router.routes_all["/folder/<folder_id:[A-Za-z0-9]{0,4}>"]
assert route.name == "route_re"
route = app.router.routes_all[
(
"folder",
"<folder_id:[A-Za-z0-9]{0,4}>",
)
]
assert route.name == "app.route_re"
assert app.url_for("route_re", folder_id="test") == "/folder/test"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_dynamic_named_route_path(app):
def test_dynamic_named_route_path():
app = Sanic("app")
@app.route("/<path:path>/info", name="route_dynamic_path")
async def handler(request, path):
return text("OK")
route = app.router.routes_all["/<path:path>/info"]
assert route.name == "route_dynamic_path"
route = app.router.routes_all[
(
"<path:path>",
"info",
)
]
assert route.name == "app.route_dynamic_path"
assert app.url_for("route_dynamic_path", path="path/1") == "/path/1/info"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_dynamic_named_route_unhashable(app):
def test_dynamic_named_route_unhashable():
app = Sanic("app")
@app.route(
"/folder/<unhashable:[A-Za-z0-9/]+>/end/", name="route_unhashable"
)
async def handler(request, unhashable):
return text("OK")
route = app.router.routes_all["/folder/<unhashable:[A-Za-z0-9/]+>/end/"]
assert route.name == "route_unhashable"
route = app.router.routes_all[
(
"folder",
"<unhashable:[A-Za-z0-9/]+>",
"end",
)
]
assert route.name == "app.route_unhashable"
url = app.url_for("route_unhashable", unhashable="test/asdf")
assert url == "/folder/test/asdf/end"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_websocket_named_route(app):
def test_websocket_named_route():
app = Sanic("app")
ev = asyncio.Event()
@app.websocket("/ws", name="route_ws")
@ -242,26 +314,29 @@ def test_websocket_named_route(app):
assert ws.subprotocol is None
ev.set()
assert app.router.routes_all["/ws"].name == "route_ws"
assert app.router.routes_all[("ws",)].name == "app.route_ws"
assert app.url_for("route_ws") == "/ws"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_websocket_named_route_with_subprotocols(app):
def test_websocket_named_route_with_subprotocols():
app = Sanic("app")
results = []
@app.websocket("/ws", subprotocols=["foo", "bar"], name="route_ws")
async def handler(request, ws):
results.append(ws.subprotocol)
assert app.router.routes_all["/ws"].name == "route_ws"
assert app.router.routes_all[("ws",)].name == "app.route_ws"
assert app.url_for("route_ws") == "/ws"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_static_add_named_route(app):
def test_static_add_named_route():
app = Sanic("app")
async def handler1(request):
return text("OK1")
@ -271,20 +346,21 @@ def test_static_add_named_route(app):
app.add_route(handler1, "/test", name="route_test")
app.add_route(handler2, "/test2", name="route_test2")
assert app.router.routes_all["/test"].name == "route_test"
assert app.router.routes_static["/test"].name == "route_test"
assert app.router.routes_all[("test",)].name == "app.route_test"
assert app.router.routes_static[("test",)].name == "app.route_test"
assert app.url_for("route_test") == "/test"
with pytest.raises(URLBuildError):
app.url_for("handler1")
assert app.router.routes_all["/test2"].name == "route_test2"
assert app.router.routes_static["/test2"].name == "route_test2"
assert app.router.routes_all[("test2",)].name == "app.route_test2"
assert app.router.routes_static[("test2",)].name == "app.route_test2"
assert app.url_for("route_test2") == "/test2"
with pytest.raises(URLBuildError):
app.url_for("handler2")
def test_dynamic_add_named_route(app):
def test_dynamic_add_named_route():
app = Sanic("app")
results = []
async def handler(request, name):
@ -292,13 +368,17 @@ def test_dynamic_add_named_route(app):
return text("OK")
app.add_route(handler, "/folder/<name>", name="route_dynamic")
assert app.router.routes_all["/folder/<name>"].name == "route_dynamic"
assert (
app.router.routes_all[("folder", "<name>")].name == "app.route_dynamic"
)
assert app.url_for("route_dynamic", name="test") == "/folder/test"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_dynamic_add_named_route_unhashable(app):
def test_dynamic_add_named_route_unhashable():
app = Sanic("app")
async def handler(request, unhashable):
return text("OK")
@ -307,15 +387,23 @@ def test_dynamic_add_named_route_unhashable(app):
"/folder/<unhashable:[A-Za-z0-9/]+>/end/",
name="route_unhashable",
)
route = app.router.routes_all["/folder/<unhashable:[A-Za-z0-9/]+>/end/"]
assert route.name == "route_unhashable"
route = app.router.routes_all[
(
"folder",
"<unhashable:[A-Za-z0-9/]+>",
"end",
)
]
assert route.name == "app.route_unhashable"
url = app.url_for("route_unhashable", unhashable="folder1")
assert url == "/folder/folder1/end"
with pytest.raises(URLBuildError):
app.url_for("handler")
def test_overload_routes(app):
def test_overload_routes():
app = Sanic("app")
@app.route("/overload", methods=["GET"], name="route_first")
async def handler1(request):
return text("OK1")
@ -342,7 +430,7 @@ def test_overload_routes(app):
request, response = app.test_client.put(app.url_for("route_second"))
assert response.text == "OK2"
assert app.router.routes_all["/overload"].name == "route_first"
assert app.router.routes_all[("overload",)].name == "app.route_first"
with pytest.raises(URLBuildError):
app.url_for("handler1")

View File

@ -13,7 +13,7 @@ def test_payload_too_large_from_error_handler(app):
def handler_exception(request, exception):
return text("Payload Too Large from error_handler.", 413)
response = app.test_client.get("/1", gather_request=False)
_, response = app.test_client.get("/1", gather_request=False)
assert response.status == 413
assert response.text == "Payload Too Large from error_handler."
@ -25,7 +25,7 @@ def test_payload_too_large_at_data_received_default(app):
async def handler2(request):
return text("OK")
response = app.test_client.get("/1", gather_request=False)
_, response = app.test_client.get("/1", gather_request=False)
assert response.status == 413
assert "Request header" in response.text
@ -38,6 +38,6 @@ def test_payload_too_large_at_on_header_default(app):
return text("OK")
data = "a" * 1000
response = app.test_client.post("/1", gather_request=False, data=data)
_, response = app.test_client.post("/1", gather_request=False, data=data)
assert response.status == 413
assert "Request body" in response.text

View File

@ -1,4 +1,4 @@
from urllib.parse import quote
from urllib.parse import quote, unquote
import pytest
@ -109,7 +109,14 @@ def test_redirect_with_header_injection(redirect_app):
assert not response.text.startswith("test-body")
@pytest.mark.parametrize("test_str", ["sanic-test", "sanictest", "sanic test"])
@pytest.mark.parametrize(
"test_str",
[
"sanic-test",
"sanictest",
"sanic test",
],
)
def test_redirect_with_params(app, test_str):
use_in_uri = quote(test_str)
@ -117,7 +124,7 @@ def test_redirect_with_params(app, test_str):
async def init_handler(request, test):
return redirect(f"/api/v2/test/{use_in_uri}/")
@app.route("/api/v2/test/<test>/")
@app.route("/api/v2/test/<test>/", unquote=True)
async def target_handler(request, test):
assert test == test_str
return text("OK")
@ -125,4 +132,4 @@ def test_redirect_with_params(app, test_str):
_, response = app.test_client.get(f"/api/v1/test/{use_in_uri}/")
assert response.status == 200
assert response.content == b"OK"
assert response.body == b"OK"

View File

@ -42,6 +42,8 @@ def write_app(filename, **runargs):
app = Sanic(__name__)
app.route("/")(lambda x: x)
@app.listener("after_server_start")
def complete(*args):
print("complete", os.getpid(), {text!r})

View File

@ -10,7 +10,6 @@ import pytest
from sanic_testing.testing import (
ASGI_BASE_URL,
ASGI_HOST,
ASGI_PORT,
HOST,
PORT,
@ -19,7 +18,7 @@ from sanic_testing.testing import (
from sanic import Blueprint, Sanic
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters
from sanic.response import html, json, text
@ -35,7 +34,7 @@ def test_sync(app):
request, response = app.test_client.get("/")
assert response.text == "Hello"
assert response.body == b"Hello"
@pytest.mark.asyncio
@ -46,7 +45,7 @@ async def test_sync_asgi(app):
request, response = await app.asgi_client.get("/")
assert response.text == "Hello"
assert response.body == b"Hello"
def test_ip(app):
@ -56,7 +55,7 @@ def test_ip(app):
request, response = app.test_client.get("/")
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
@pytest.mark.asyncio
@ -67,10 +66,12 @@ async def test_url_asgi(app):
request, response = await app.asgi_client.get("/")
if response.text.endswith("/") and not ASGI_BASE_URL.endswith("/"):
response.text[:-1] == ASGI_BASE_URL
if response.body.decode().endswith("/") and not ASGI_BASE_URL.endswith(
"/"
):
response.body[:-1] == ASGI_BASE_URL.encode()
else:
assert response.text == ASGI_BASE_URL
assert response.body == ASGI_BASE_URL.encode()
def test_text(app):
@ -80,7 +81,7 @@ def test_text(app):
request, response = app.test_client.get("/")
assert response.text == "Hello"
assert response.body == b"Hello"
def test_html(app):
@ -109,13 +110,13 @@ def test_html(app):
request, response = app.test_client.get("/")
assert response.content_type == "text/html; charset=utf-8"
assert response.text == "<h1>Hello</h1>"
assert response.body == b"<h1>Hello</h1>"
request, response = app.test_client.get("/foo")
assert response.text == "<h1>Foo</h1>"
assert response.body == b"<h1>Foo</h1>"
request, response = app.test_client.get("/bar")
assert response.text == "<h1>Bar object repr</h1>"
assert response.body == b"<h1>Bar object repr</h1>"
@pytest.mark.asyncio
@ -126,7 +127,7 @@ async def test_text_asgi(app):
request, response = await app.asgi_client.get("/")
assert response.text == "Hello"
assert response.body == b"Hello"
def test_headers(app):
@ -186,7 +187,7 @@ def test_invalid_response(app):
request, response = app.test_client.get("/")
assert response.status == 500
assert response.text == "Internal Server Error."
assert response.body == b"Internal Server Error."
@pytest.mark.asyncio
@ -201,7 +202,7 @@ async def test_invalid_response_asgi(app):
request, response = await app.asgi_client.get("/")
assert response.status == 500
assert response.text == "Internal Server Error."
assert response.body == b"Internal Server Error."
def test_json(app):
@ -224,7 +225,7 @@ async def test_json_asgi(app):
request, response = await app.asgi_client.get("/")
results = json_loads(response.text)
results = json_loads(response.body)
assert results.get("test") is True
@ -237,7 +238,7 @@ def test_empty_json(app):
request, response = app.test_client.get("/")
assert response.status == 200
assert response.text == "null"
assert response.body == b"null"
@pytest.mark.asyncio
@ -249,7 +250,7 @@ async def test_empty_json_asgi(app):
request, response = await app.asgi_client.get("/")
assert response.status == 200
assert response.text == "null"
assert response.body == b"null"
def test_invalid_json(app):
@ -423,12 +424,12 @@ def test_content_type(app):
request, response = app.test_client.get("/")
assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE
assert response.text == DEFAULT_HTTP_CONTENT_TYPE
assert response.body.decode() == DEFAULT_HTTP_CONTENT_TYPE
headers = {"content-type": "application/json"}
request, response = app.test_client.get("/", headers=headers)
assert request.content_type == "application/json"
assert response.text == "application/json"
assert response.body == b"application/json"
@pytest.mark.asyncio
@ -439,12 +440,12 @@ async def test_content_type_asgi(app):
request, response = await app.asgi_client.get("/")
assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE
assert response.text == DEFAULT_HTTP_CONTENT_TYPE
assert response.body.decode() == DEFAULT_HTTP_CONTENT_TYPE
headers = {"content-type": "application/json"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.content_type == "application/json"
assert response.text == "application/json"
assert response.body == b"application/json"
def test_standard_forwarded(app):
@ -581,14 +582,15 @@ async def test_standard_forwarded_asgi(app):
"X-Scheme": "ws",
}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"for": "127.0.0.2", "proto": "ws"}
assert response.json == {"for": "127.0.0.2", "proto": "ws"}
assert request.remote_addr == "127.0.0.2"
assert request.scheme == "ws"
assert request.server_port == ASGI_PORT
app.config.FORWARDED_SECRET = "mySecret"
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {
assert response.json == {
"for": "[::2]",
"proto": "https",
"host": "me.tld",
@ -603,13 +605,13 @@ async def test_standard_forwarded_asgi(app):
# Empty Forwarded header -> use X-headers
headers["Forwarded"] = ""
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"for": "127.0.0.2", "proto": "ws"}
assert response.json == {"for": "127.0.0.2", "proto": "ws"}
# Header present but not matching anything
request, response = await app.asgi_client.get(
"/", headers={"Forwarded": "."}
)
assert response.json() == {}
assert response.json == {}
# Forwarded header present but no matching secret -> use X-headers
headers = {
@ -617,13 +619,13 @@ async def test_standard_forwarded_asgi(app):
"X-Real-IP": "127.0.0.2",
}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"for": "127.0.0.2"}
assert response.json == {"for": "127.0.0.2"}
assert request.remote_addr == "127.0.0.2"
# Different formatting and hitting both ends of the header
headers = {"Forwarded": 'Secret="mySecret";For=127.0.0.4;Port=1234'}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {
assert response.json == {
"for": "127.0.0.4",
"port": 1234,
"secret": "mySecret",
@ -632,7 +634,7 @@ async def test_standard_forwarded_asgi(app):
# Test escapes (modify this if you see anyone implementing quoted-pairs)
headers = {"Forwarded": 'for=test;quoted="\\,x=x;y=\\";secret=mySecret'}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {
assert response.json == {
"for": "test",
"quoted": "\\,x=x;y=\\",
"secret": "mySecret",
@ -641,17 +643,17 @@ async def test_standard_forwarded_asgi(app):
# Secret insulated by malformed field #1
headers = {"Forwarded": "for=test;secret=mySecret;b0rked;proto=wss;"}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"for": "test", "secret": "mySecret"}
assert response.json == {"for": "test", "secret": "mySecret"}
# Secret insulated by malformed field #2
headers = {"Forwarded": "for=test;b0rked;secret=mySecret;proto=wss"}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"proto": "wss", "secret": "mySecret"}
assert response.json == {"proto": "wss", "secret": "mySecret"}
# Unexpected termination should not lose existing acceptable values
headers = {"Forwarded": "b0rked;secret=mySecret;proto=wss"}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"proto": "wss", "secret": "mySecret"}
assert response.json == {"proto": "wss", "secret": "mySecret"}
# Field normalization
headers = {
@ -659,7 +661,7 @@ async def test_standard_forwarded_asgi(app):
'PATH="/With%20Spaces%22Quoted%22/sanicApp?key=val";SECRET=mySecret'
}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {
assert response.json == {
"proto": "wss",
"by": "[cafe::8000]",
"host": "a:2",
@ -671,7 +673,10 @@ async def test_standard_forwarded_asgi(app):
app.config.FORWARDED_SECRET = "_proxySecret"
headers = {"Forwarded": "for=1.2.3.4; by=_proxySecret"}
request, response = await app.asgi_client.get("/", headers=headers)
assert response.json() == {"for": "1.2.3.4", "by": "_proxySecret"}
assert response.json == {
"for": "1.2.3.4",
"by": "_proxySecret",
}
def test_remote_addr_with_two_proxies(app):
@ -685,33 +690,33 @@ def test_remote_addr_with_two_proxies(app):
headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.2"
assert response.text == "127.0.0.2"
assert response.body == b"127.0.0.2"
headers = {"X-Forwarded-For": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
request, response = app.test_client.get("/")
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.0.1, , ,,127.0.1.2"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
headers = {
"X-Forwarded-For": ", 127.0.2.2, , ,127.0.0.1, , ,,127.0.1.2"
}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
@pytest.mark.asyncio
@ -726,33 +731,33 @@ async def test_remote_addr_with_two_proxies_asgi(app):
headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.2"
assert response.text == "127.0.0.2"
assert response.body == b"127.0.0.2"
headers = {"X-Forwarded-For": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
request, response = await app.asgi_client.get("/")
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.0.1, , ,,127.0.1.2"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
headers = {
"X-Forwarded-For": ", 127.0.2.2, , ,127.0.0.1, , ,,127.0.1.2"
}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.1"
assert response.text == "127.0.0.1"
assert response.body == b"127.0.0.1"
def test_remote_addr_without_proxy(app):
@ -765,17 +770,17 @@ def test_remote_addr_without_proxy(app):
headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
@pytest.mark.asyncio
@ -789,17 +794,17 @@ async def test_remote_addr_without_proxy_asgi(app):
headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
def test_remote_addr_custom_headers(app):
@ -814,17 +819,17 @@ def test_remote_addr_custom_headers(app):
headers = {"X-Real-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.1.1"
assert response.text == "127.0.1.1"
assert response.body == b"127.0.1.1"
headers = {"X-Forwarded-For": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"Client-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
request, response = app.test_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.2"
assert response.text == "127.0.0.2"
assert response.body == b"127.0.0.2"
@pytest.mark.asyncio
@ -840,17 +845,17 @@ async def test_remote_addr_custom_headers_asgi(app):
headers = {"X-Real-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == "127.0.1.1"
assert response.text == "127.0.1.1"
assert response.body == b"127.0.1.1"
headers = {"X-Forwarded-For": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == ""
assert response.text == ""
assert response.body == b""
headers = {"Client-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
request, response = await app.asgi_client.get("/", headers=headers)
assert request.remote_addr == "127.0.0.2"
assert response.text == "127.0.0.2"
assert response.body == b"127.0.0.2"
def test_forwarded_scheme(app):
@ -894,7 +899,7 @@ async def test_match_info_asgi(app):
request, response = await app.asgi_client.get("/api/v1/user/sanic_user/")
assert request.match_info == {"user_id": "sanic_user"}
assert json_loads(response.text) == {"user_id": "sanic_user"}
assert json_loads(response.body) == {"user_id": "sanic_user"}
# ------------------------------------------------------------ #
@ -916,7 +921,7 @@ def test_post_json(app):
assert request.json.get("test") == "OK"
assert request.json.get("test") == "OK" # for request.parsed_json
assert response.text == "OK"
assert response.body == b"OK"
@pytest.mark.asyncio
@ -934,7 +939,7 @@ async def test_post_json_asgi(app):
assert request.json.get("test") == "OK"
assert request.json.get("test") == "OK" # for request.parsed_json
assert response.text == "OK"
assert response.body == b"OK"
def test_post_form_urlencoded(app):
@ -2136,7 +2141,7 @@ def test_safe_method_with_body_ignored(app):
assert request.body == b""
assert request.json == None
assert response.text == "OK"
assert response.body == b"OK"
def test_safe_method_with_body(app):
@ -2153,4 +2158,4 @@ def test_safe_method_with_body(app):
assert request.body == data.encode("utf-8")
assert request.json.get("test") == "OK"
assert response.text == "OK"
assert response.body == b"OK"

View File

@ -14,6 +14,7 @@ import pytest
from aiofiles import os as async_os
from sanic_testing.testing import HOST, PORT
from sanic import Sanic
from sanic.response import (
HTTPResponse,
StreamingHTTPResponse,
@ -51,16 +52,22 @@ async def sample_streaming_fn(response):
await response.write("bar")
def test_method_not_allowed(app):
def test_method_not_allowed():
app = Sanic("app")
@app.get("/")
async def test_get(request):
return response.json({"hello": "world"})
request, response = app.test_client.head("/")
assert response.headers["Allow"] == "GET"
assert set(response.headers["Allow"].split(", ")) == {
"GET",
}
request, response = app.test_client.post("/")
assert response.headers["Allow"] == "GET"
assert set(response.headers["Allow"].split(", ")) == {"GET", "HEAD"}
app.router.reset()
@app.post("/")
async def test_post(request):
@ -68,12 +75,20 @@ def test_method_not_allowed(app):
request, response = app.test_client.head("/")
assert response.status == 405
assert set(response.headers["Allow"].split(", ")) == {"GET", "POST"}
assert set(response.headers["Allow"].split(", ")) == {
"GET",
"POST",
"HEAD",
}
assert response.headers["Content-Length"] == "0"
request, response = app.test_client.patch("/")
assert response.status == 405
assert set(response.headers["Allow"].split(", ")) == {"GET", "POST"}
assert set(response.headers["Allow"].split(", ")) == {
"GET",
"POST",
"HEAD",
}
assert response.headers["Content-Length"] == "0"
@ -237,7 +252,7 @@ def test_chunked_streaming_returns_correct_content(streaming_app):
@pytest.mark.asyncio
async def test_chunked_streaming_returns_correct_content_asgi(streaming_app):
request, response = await streaming_app.asgi_client.get("/")
assert response.text == "foo,bar"
assert response.body == b"foo,bar"
def test_non_chunked_streaming_adds_correct_headers(non_chunked_streaming_app):

View File

@ -1,18 +1,180 @@
import asyncio
from unittest.mock import Mock
import pytest
from sanic_routing.exceptions import ParameterNameConflicts, RouteExists
from sanic_testing.testing import SanicTestClient
from sanic import Sanic
from sanic import Blueprint, Sanic
from sanic.constants import HTTP_METHODS
from sanic.exceptions import NotFound
from sanic.request import Request
from sanic.response import json, text
from sanic.router import ParameterNameConflicts, RouteDoesNotExist, RouteExists
# ------------------------------------------------------------ #
# UTF-8
# ------------------------------------------------------------ #
@pytest.mark.parametrize(
"path,headers,expected",
(
# app base
(b"/", {}, 200),
(b"/", {"host": "maybe.com"}, 200),
(b"/host", {"host": "matching.com"}, 200),
(b"/host", {"host": "wrong.com"}, 404),
# app strict_slashes default
(b"/without", {}, 200),
(b"/without/", {}, 200),
(b"/with", {}, 200),
(b"/with/", {}, 200),
# app strict_slashes off - expressly
(b"/expwithout", {}, 200),
(b"/expwithout/", {}, 200),
(b"/expwith", {}, 200),
(b"/expwith/", {}, 200),
# app strict_slashes on
(b"/without/strict", {}, 200),
(b"/without/strict/", {}, 404),
(b"/with/strict", {}, 404),
(b"/with/strict/", {}, 200),
# bp1 base
(b"/bp1", {}, 200),
(b"/bp1", {"host": "maybe.com"}, 200),
(b"/bp1/host", {"host": "matching.com"}, 200), # BROKEN ON MASTER
(b"/bp1/host", {"host": "wrong.com"}, 404),
# bp1 strict_slashes default
(b"/bp1/without", {}, 200),
(b"/bp1/without/", {}, 200),
(b"/bp1/with", {}, 200),
(b"/bp1/with/", {}, 200),
# bp1 strict_slashes off - expressly
(b"/bp1/expwithout", {}, 200),
(b"/bp1/expwithout/", {}, 200),
(b"/bp1/expwith", {}, 200),
(b"/bp1/expwith/", {}, 200),
# bp1 strict_slashes on
(b"/bp1/without/strict", {}, 200),
(b"/bp1/without/strict/", {}, 404),
(b"/bp1/with/strict", {}, 404),
(b"/bp1/with/strict/", {}, 200),
# bp2 base
(b"/bp2/", {}, 200),
(b"/bp2/", {"host": "maybe.com"}, 200),
(b"/bp2/host", {"host": "matching.com"}, 200), # BROKEN ON MASTER
(b"/bp2/host", {"host": "wrong.com"}, 404),
# bp2 strict_slashes default
(b"/bp2/without", {}, 200),
(b"/bp2/without/", {}, 404),
(b"/bp2/with", {}, 404),
(b"/bp2/with/", {}, 200),
# # bp2 strict_slashes off - expressly
(b"/bp2/expwithout", {}, 200),
(b"/bp2/expwithout/", {}, 200),
(b"/bp2/expwith", {}, 200),
(b"/bp2/expwith/", {}, 200),
# # bp2 strict_slashes on
(b"/bp2/without/strict", {}, 200),
(b"/bp2/without/strict/", {}, 404),
(b"/bp2/with/strict", {}, 404),
(b"/bp2/with/strict/", {}, 200),
# bp3 base
(b"/bp3", {}, 200),
(b"/bp3", {"host": "maybe.com"}, 200),
(b"/bp3/host", {"host": "matching.com"}, 200), # BROKEN ON MASTER
(b"/bp3/host", {"host": "wrong.com"}, 404),
# bp3 strict_slashes default
(b"/bp3/without", {}, 200),
(b"/bp3/without/", {}, 200),
(b"/bp3/with", {}, 200),
(b"/bp3/with/", {}, 200),
# bp3 strict_slashes off - expressly
(b"/bp3/expwithout", {}, 200),
(b"/bp3/expwithout/", {}, 200),
(b"/bp3/expwith", {}, 200),
(b"/bp3/expwith/", {}, 200),
# bp3 strict_slashes on
(b"/bp3/without/strict", {}, 200),
(b"/bp3/without/strict/", {}, 404),
(b"/bp3/with/strict", {}, 404),
(b"/bp3/with/strict/", {}, 200),
# bp4 base
(b"/bp4", {}, 404),
(b"/bp4", {"host": "maybe.com"}, 200),
(b"/bp4/host", {"host": "matching.com"}, 200), # BROKEN ON MASTER
(b"/bp4/host", {"host": "wrong.com"}, 404),
# bp4 strict_slashes default
(b"/bp4/without", {}, 404),
(b"/bp4/without/", {}, 404),
(b"/bp4/with", {}, 404),
(b"/bp4/with/", {}, 404),
# bp4 strict_slashes off - expressly
(b"/bp4/expwithout", {}, 404),
(b"/bp4/expwithout/", {}, 404),
(b"/bp4/expwith", {}, 404),
(b"/bp4/expwith/", {}, 404),
# bp4 strict_slashes on
(b"/bp4/without/strict", {}, 404),
(b"/bp4/without/strict/", {}, 404),
(b"/bp4/with/strict", {}, 404),
(b"/bp4/with/strict/", {}, 404),
),
)
def test_matching(path, headers, expected):
app = Sanic("dev")
bp1 = Blueprint("bp1", url_prefix="/bp1")
bp2 = Blueprint("bp2", url_prefix="/bp2", strict_slashes=True)
bp3 = Blueprint("bp3", url_prefix="/bp3", strict_slashes=False)
bp4 = Blueprint("bp4", url_prefix="/bp4", host="maybe.com")
def handler(request):
return text("Hello!")
defs = (
("/", None, None),
("/host", None, "matching.com"),
("/without", None, None),
("/with/", None, None),
("/expwithout", False, None),
("/expwith/", False, None),
("/without/strict", True, None),
("/with/strict/", True, None),
)
for uri, strict_slashes, host in defs:
params = {"uri": uri}
if strict_slashes is not None:
params["strict_slashes"] = strict_slashes
if host is not None:
params["host"] = host
app.route(**params)(handler)
bp1.route(**params)(handler)
bp2.route(**params)(handler)
bp3.route(**params)(handler)
bp4.route(**params)(handler)
app.blueprint(bp1)
app.blueprint(bp2)
app.blueprint(bp3)
app.blueprint(bp4)
app.router.finalize()
request = Request(path, headers, None, "GET", None, app)
try:
app.router.get(request=request)
except NotFound:
response = 404
except Exception:
response = 500
else:
response = 200
assert response == expected
# # ------------------------------------------------------------ #
# # UTF-8
# # ------------------------------------------------------------ #
@pytest.mark.parametrize("method", HTTP_METHODS)
@ -164,7 +326,6 @@ def test_route_optional_slash(app):
def test_route_strict_slashes_set_to_false_and_host_is_a_list(app):
# Part of regression test for issue #1120
test_client = SanicTestClient(app, port=42101)
site1 = f"127.0.0.1:{test_client.port}"
@ -176,6 +337,8 @@ def test_route_strict_slashes_set_to_false_and_host_is_a_list(app):
request, response = test_client.get("http://" + site1 + "/get")
assert response.text == "OK"
app.router.finalized = False
@app.post("/post", host=[site1, "site2.com"], strict_slashes=False)
def post_handler(request):
return text("OK")
@ -183,6 +346,8 @@ def test_route_strict_slashes_set_to_false_and_host_is_a_list(app):
request, response = test_client.post("http://" + site1 + "/post")
assert response.text == "OK"
app.router.finalized = False
@app.put("/put", host=[site1, "site2.com"], strict_slashes=False)
def put_handler(request):
return text("OK")
@ -190,6 +355,8 @@ def test_route_strict_slashes_set_to_false_and_host_is_a_list(app):
request, response = test_client.put("http://" + site1 + "/put")
assert response.text == "OK"
app.router.finalized = False
@app.delete("/delete", host=[site1, "site2.com"], strict_slashes=False)
def delete_handler(request):
return text("OK")
@ -294,6 +461,8 @@ def test_dynamic_route(app):
results.append(name)
return text("OK")
app.router.finalize(False)
request, response = app.test_client.get("/folder/test123")
assert response.text == "OK"
@ -368,6 +537,9 @@ def test_dynamic_route_regex(app):
async def handler(request, folder_id):
return text("OK")
app.router.finalize()
print(app.router.find_route_src)
request, response = app.test_client.get("/folder/test")
assert response.status == 200
@ -415,6 +587,8 @@ def test_dynamic_route_path(app):
request, response = app.test_client.get("/info")
assert response.status == 404
app.router.reset()
@app.route("/<path:path>")
async def handler1(request, path):
return text("OK")
@ -459,6 +633,19 @@ def test_websocket_route(app, url):
assert ev.is_set()
def test_websocket_route_invalid_handler(app):
with pytest.raises(ValueError) as e:
@app.websocket("/")
async def handler():
...
assert e.match(
r"Required parameter `request` and/or `ws` missing in the "
r"handler\(\) route\?"
)
@pytest.mark.asyncio
@pytest.mark.parametrize("url", ["/ws", "ws"])
async def test_websocket_route_asgi(app, url):
@ -774,7 +961,7 @@ def test_removing_slash(app):
def post(_):
pass
assert len(app.router.routes_all.keys()) == 2
assert len(app.router.routes_all.keys()) == 1
def test_overload_routes(app):
@ -798,6 +985,7 @@ def test_overload_routes(app):
request, response = app.test_client.delete("/overload")
assert response.status == 405
app.router.reset()
with pytest.raises(RouteExists):
@app.route("/overload", methods=["PUT", "DELETE"])
@ -810,11 +998,18 @@ def test_unmergeable_overload_routes(app):
async def handler1(request):
return text("OK1")
with pytest.raises(RouteExists):
@app.route("/overload_whole", methods=["POST", "PUT"])
async def handler2(request):
return text("Duplicated")
return text("OK1")
assert (
len(
dict(list(app.router.static_routes.values())[0].handlers)[
"overload_whole"
]
)
== 3
)
request, response = app.test_client.get("/overload_whole")
assert response.text == "OK1"
@ -822,6 +1017,11 @@ def test_unmergeable_overload_routes(app):
request, response = app.test_client.post("/overload_whole")
assert response.text == "OK1"
request, response = app.test_client.put("/overload_whole")
assert response.text == "OK1"
app.router.reset()
@app.route("/overload_part", methods=["GET"])
async def handler3(request):
return text("OK1")
@ -847,7 +1047,9 @@ def test_unicode_routes(app):
request, response = app.test_client.get("/你好")
assert response.text == "OK1"
@app.route("/overload/<param>", methods=["GET"])
app.router.reset()
@app.route("/overload/<param>", methods=["GET"], unquote=True)
async def handler2(request, param):
return text("OK2 " + param)
@ -865,21 +1067,39 @@ def test_uri_with_different_method_and_different_params(app):
return json({"action": action})
request, response = app.test_client.get("/ads/1234")
assert response.status == 200
assert response.json == {"ad_id": "1234"}
assert response.status == 405
request, response = app.test_client.post("/ads/post")
assert response.status == 200
assert response.json == {"action": "post"}
def test_route_raise_ParameterNameConflicts(app):
with pytest.raises(ParameterNameConflicts):
def test_uri_with_different_method_and_same_params(app):
@app.route("/ads/<ad_id>", methods=["GET"])
async def ad_get(request, ad_id):
return json({"ad_id": ad_id})
@app.route("/ads/<ad_id>", methods=["POST"])
async def ad_post(request, ad_id):
return json({"ad_id": ad_id})
request, response = app.test_client.get("/ads/1234")
assert response.status == 200
assert response.json == {"ad_id": "1234"}
request, response = app.test_client.post("/ads/post")
assert response.status == 200
assert response.json == {"ad_id": "post"}
def test_route_raise_ParameterNameConflicts(app):
@app.get("/api/v1/<user>/<user>/")
def handler(request, user):
return text("OK")
with pytest.raises(ParameterNameConflicts):
app.router.finalize()
def test_route_invalid_host(app):

View File

@ -8,6 +8,8 @@ import pytest
from sanic_testing.testing import HOST, PORT
from sanic.exceptions import InvalidUsage
AVAILABLE_LISTENERS = [
"before_server_start",
@ -80,6 +82,18 @@ def test_all_listeners(app):
assert app.name + listener_name == output.pop()
@skipif_no_alarm
def test_all_listeners_as_convenience(app):
output = []
for listener_name in AVAILABLE_LISTENERS:
listener = create_listener(listener_name, output)
method = getattr(app, listener_name)
method(listener)
start_stop_app(app)
for listener_name in AVAILABLE_LISTENERS:
assert app.name + listener_name == output.pop()
@pytest.mark.asyncio
async def test_trigger_before_events_create_server(app):
class MySanicDb:
@ -95,6 +109,20 @@ async def test_trigger_before_events_create_server(app):
assert isinstance(app.db, MySanicDb)
@pytest.mark.asyncio
async def test_trigger_before_events_create_server_missing_event(app):
class MySanicDb:
pass
with pytest.raises(InvalidUsage):
@app.listener
async def init_db(app, loop):
app.db = MySanicDb()
assert not hasattr(app, "db")
def test_create_server_trigger_events(app):
"""Test if create_server can trigger server events"""

View File

@ -106,6 +106,7 @@ def test_static_file_bytes(app, static_file_directory, file_name):
[dict(), list(), object()],
)
def test_static_file_invalid_path(app, static_file_directory, file_name):
app.route("/")(lambda x: x)
with pytest.raises(ValueError):
app.static("/testing.file", file_name)
request, response = app.test_client.get("/testing.file")
@ -126,6 +127,40 @@ def test_static_file_content_type(app, static_file_directory, file_name):
assert response.headers["Content-Type"] == "text/html; charset=utf-8"
@pytest.mark.parametrize(
"file_name,expected",
[
("test.html", "text/html; charset=utf-8"),
("decode me.txt", "text/plain; charset=utf-8"),
("test.file", "application/octet-stream"),
],
)
def test_static_file_content_type_guessed(
app, static_file_directory, file_name, expected
):
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
assert response.headers["Content-Type"] == expected
def test_static_file_content_type_with_charset(app, static_file_directory):
app.static(
"/testing.file",
get_file_path(static_file_directory, "decode me.txt"),
content_type="text/plain;charset=ISO-8859-1",
)
request, response = app.test_client.get("/testing.file")
assert response.status == 200
assert response.headers["Content-Type"] == "text/plain;charset=ISO-8859-1"
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "symlink", "hard_link"]
)

View File

@ -7,6 +7,7 @@ import pytest as pytest
from sanic_testing.testing import HOST as test_host
from sanic_testing.testing import PORT as test_port
from sanic import Sanic
from sanic.blueprints import Blueprint
from sanic.exceptions import URLBuildError
from sanic.response import text
@ -98,36 +99,36 @@ def test_url_for_with_server_name(app):
assert response.text == "this should pass"
def test_fails_if_endpoint_not_found(app):
def test_fails_if_endpoint_not_found():
app = Sanic("app")
@app.route("/fail")
def fail(request):
return text("this should fail")
with pytest.raises(URLBuildError) as e:
app.url_for("passes")
assert str(e.value) == "Endpoint with name `passes` was not found"
e.match("Endpoint with name `app.passes` was not found")
def test_fails_url_build_if_param_not_passed(app):
url = "/"
for letter in string.ascii_letters:
for letter in string.ascii_lowercase:
url += f"<{letter}>/"
@app.route(url)
def fail(request):
return text("this should fail")
fail_args = list(string.ascii_letters)
fail_args = list(string.ascii_lowercase)
fail_args.pop()
fail_kwargs = {l: l for l in fail_args}
with pytest.raises(URLBuildError) as e:
app.url_for("fail", **fail_kwargs)
assert "Required parameter `Z` was not passed to url_for" in str(e.value)
assert e.match("Required parameter `z` was not passed to url_for")
def test_fails_url_build_if_params_not_passed(app):
@ -137,8 +138,7 @@ def test_fails_url_build_if_params_not_passed(app):
with pytest.raises(ValueError) as e:
app.url_for("fail", _scheme="http")
assert str(e.value) == "When specifying _scheme, _external must be True"
assert e.match("When specifying _scheme, _external must be True")
COMPLEX_PARAM_URL = (
@ -168,7 +168,7 @@ def test_fails_with_int_message(app):
expected_error = (
r'Value "not_int" for parameter `foo` '
r"does not match pattern for type `int`: -?\d+"
r"does not match pattern for type `int`: ^-?\d+"
)
assert str(e.value) == expected_error
@ -199,14 +199,11 @@ def test_fails_with_two_letter_string_message(app):
with pytest.raises(URLBuildError) as e:
app.url_for("fail", **failing_kwargs)
expected_error = (
e.match(
'Value "foobar" for parameter `two_letter_string` '
"does not satisfy pattern [A-z]{2}"
"does not satisfy pattern ^[A-z]{2}$"
)
assert str(e.value) == expected_error
def test_fails_with_number_message(app):
@app.route(COMPLEX_PARAM_URL)
@ -218,14 +215,11 @@ def test_fails_with_number_message(app):
with pytest.raises(URLBuildError) as e:
app.url_for("fail", **failing_kwargs)
expected_error = (
e.match(
'Value "foo" for parameter `some_number` '
r"does not match pattern for type `float`: -?(?:\d+(?:\.\d*)?|\.\d+)"
r"does not match pattern for type `float`: ^-?(?:\d+(?:\.\d*)?|\.\d+)$"
)
assert str(e.value) == expected_error
@pytest.mark.parametrize("number", [3, -3, 13.123, -13.123])
def test_passes_with_negative_number_message(app, number):
@ -259,7 +253,8 @@ def test_adds_other_supplied_values_as_query_string(app):
@pytest.fixture
def blueprint_app(app):
def blueprint_app():
app = Sanic("app")
first_print = Blueprint("first", url_prefix="/first")
second_print = Blueprint("second", url_prefix="/second")
@ -273,11 +268,11 @@ def blueprint_app(app):
return text(f"foo from first : {param}")
@second_print.route("/foo") # noqa
def foo(request):
def bar(request):
return text("foo from second")
@second_print.route("/foo/<param>") # noqa
def foo_with_param(request, param):
def bar_with_param(request, param):
return text(f"foo from second : {param}")
app.blueprint(first_print)
@ -290,7 +285,7 @@ def test_blueprints_are_named_correctly(blueprint_app):
first_url = blueprint_app.url_for("first.foo")
assert first_url == "/first/foo"
second_url = blueprint_app.url_for("second.foo")
second_url = blueprint_app.url_for("second.bar")
assert second_url == "/second/foo"
@ -298,7 +293,7 @@ def test_blueprints_work_with_params(blueprint_app):
first_url = blueprint_app.url_for("first.foo_with_param", param="bar")
assert first_url == "/first/foo/bar"
second_url = blueprint_app.url_for("second.foo_with_param", param="bar")
second_url = blueprint_app.url_for("second.bar_with_param", param="bar")
assert second_url == "/second/foo/bar"

View File

@ -1,18 +1,18 @@
import asyncio
import pytest
from sanic_testing.testing import SanicTestClient
from sanic.blueprints import Blueprint
def test_routes_with_host(app):
@app.route("/")
@app.route("/", name="hostindex", host="example.com")
@app.route("/path", name="hostpath", host="path.example.com")
def index(request):
pass
assert app.url_for("index") == "/"
assert app.url_for("hostindex") == "/"
assert app.url_for("hostpath") == "/path"
assert app.url_for("hostindex", _external=True) == "http://example.com/"
@ -22,6 +22,27 @@ def test_routes_with_host(app):
)
def test_routes_with_multiple_hosts(app):
@app.route("/", name="hostindex", host=["example.com", "path.example.com"])
def index(request):
pass
assert app.url_for("hostindex") == "/"
assert (
app.url_for("hostindex", _host="example.com") == "http://example.com/"
)
with pytest.raises(ValueError) as e:
assert app.url_for("hostindex", _external=True)
assert str(e.value).startswith("Host is ambiguous")
with pytest.raises(ValueError) as e:
assert app.url_for("hostindex", _host="unknown.com")
assert str(e.value).startswith(
"Requested host (unknown.com) is not available for this route"
)
def test_websocket_bp_route_name(app):
"""Tests that blueprint websocket route is named."""
event = asyncio.Event()
@ -63,3 +84,7 @@ def test_websocket_bp_route_name(app):
uri = app.url_for("test_bp.foobar_3")
assert uri == "/bp/route3"
# TODO: add test with a route with multiple hosts
# TODO: add test with a route with _host in url_for

View File

@ -3,6 +3,7 @@ import os
import pytest
from sanic import Sanic
from sanic.blueprints import Blueprint
@ -26,9 +27,15 @@ def get_file_content(static_file_directory, file_name):
@pytest.mark.parametrize(
"file_name", ["test.file", "decode me.txt", "python.png"]
"file_name",
[
"test.file",
"decode me.txt",
"python.png",
],
)
def test_static_file(app, static_file_directory, file_name):
def test_static_file(static_file_directory, file_name):
app = Sanic("qq")
app.static(
"/testing.file", get_file_path(static_file_directory, file_name)
)
@ -38,6 +45,8 @@ def test_static_file(app, static_file_directory, file_name):
name="testing_file",
)
app.router.finalize()
uri = app.url_for("static")
uri2 = app.url_for("static", filename="any")
uri3 = app.url_for("static", name="static", filename="any")
@ -46,10 +55,14 @@ def test_static_file(app, static_file_directory, file_name):
assert uri == uri2
assert uri2 == uri3
app.router.reset()
request, response = app.test_client.get(uri)
assert response.status == 200
assert response.body == get_file_content(static_file_directory, file_name)
app.router.reset()
bp = Blueprint("test_bp_static", url_prefix="/bp")
bp.static("/testing.file", get_file_path(static_file_directory, file_name))
@ -61,19 +74,14 @@ def test_static_file(app, static_file_directory, file_name):
app.blueprint(bp)
uri = app.url_for("static", name="test_bp_static.static")
uri2 = app.url_for("static", name="test_bp_static.static", filename="any")
uri3 = app.url_for("test_bp_static.static")
uri4 = app.url_for("test_bp_static.static", name="any")
uri5 = app.url_for("test_bp_static.static", filename="any")
uri6 = app.url_for("test_bp_static.static", name="any", filename="any")
uris = [
app.url_for("static", name="test_bp_static.static"),
app.url_for("static", name="test_bp_static.static", filename="any"),
app.url_for("test_bp_static.static"),
app.url_for("test_bp_static.static", filename="any"),
]
assert uri == "/bp/testing.file"
assert uri == uri2
assert uri2 == uri3
assert uri3 == uri4
assert uri4 == uri5
assert uri5 == uri6
assert all(uri == "/bp/testing.file" for uri in uris)
request, response = app.test_client.get(uri)
assert response.status == 200
@ -112,7 +120,9 @@ def test_static_file(app, static_file_directory, file_name):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
@pytest.mark.parametrize("base_uri", ["/static", "", "/dir"])
def test_static_directory(app, file_name, base_uri, static_file_directory):
def test_static_directory(file_name, base_uri, static_file_directory):
app = Sanic("base")
app.static(base_uri, static_file_directory)
base_uri2 = base_uri + "/2"
app.static(base_uri2, static_file_directory, name="uploads")
@ -141,6 +151,8 @@ def test_static_directory(app, file_name, base_uri, static_file_directory):
bp.static(base_uri, static_file_directory)
bp.static(base_uri2, static_file_directory, name="uploads")
app.router.reset()
app.blueprint(bp)
uri = app.url_for(
@ -169,7 +181,8 @@ def test_static_directory(app, file_name, base_uri, static_file_directory):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_static_head_request(app, file_name, static_file_directory):
def test_static_head_request(file_name, static_file_directory):
app = Sanic("base")
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
@ -214,7 +227,8 @@ def test_static_head_request(app, file_name, static_file_directory):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_static_content_range_correct(app, file_name, static_file_directory):
def test_static_content_range_correct(file_name, static_file_directory):
app = Sanic("base")
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
@ -252,11 +266,6 @@ def test_static_content_range_correct(app, file_name, static_file_directory):
"static", name="test_bp_static.static", filename="any"
)
assert uri == app.url_for("test_bp_static.static")
assert uri == app.url_for("test_bp_static.static", name="any")
assert uri == app.url_for("test_bp_static.static", filename="any")
assert uri == app.url_for(
"test_bp_static.static", name="any", filename="any"
)
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 206
@ -270,7 +279,8 @@ def test_static_content_range_correct(app, file_name, static_file_directory):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_static_content_range_front(app, file_name, static_file_directory):
def test_static_content_range_front(file_name, static_file_directory):
app = Sanic("base")
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
@ -308,11 +318,7 @@ def test_static_content_range_front(app, file_name, static_file_directory):
"static", name="test_bp_static.static", filename="any"
)
assert uri == app.url_for("test_bp_static.static")
assert uri == app.url_for("test_bp_static.static", name="any")
assert uri == app.url_for("test_bp_static.static", filename="any")
assert uri == app.url_for(
"test_bp_static.static", name="any", filename="any"
)
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 206
@ -326,7 +332,8 @@ def test_static_content_range_front(app, file_name, static_file_directory):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_static_content_range_back(app, file_name, static_file_directory):
def test_static_content_range_back(file_name, static_file_directory):
app = Sanic("base")
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
@ -364,11 +371,7 @@ def test_static_content_range_back(app, file_name, static_file_directory):
"static", name="test_bp_static.static", filename="any"
)
assert uri == app.url_for("test_bp_static.static")
assert uri == app.url_for("test_bp_static.static", name="any")
assert uri == app.url_for("test_bp_static.static", filename="any")
assert uri == app.url_for(
"test_bp_static.static", name="any", filename="any"
)
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 206
@ -382,7 +385,8 @@ def test_static_content_range_back(app, file_name, static_file_directory):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_static_content_range_empty(app, file_name, static_file_directory):
def test_static_content_range_empty(file_name, static_file_directory):
app = Sanic("base")
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
@ -420,11 +424,7 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
"static", name="test_bp_static.static", filename="any"
)
assert uri == app.url_for("test_bp_static.static")
assert uri == app.url_for("test_bp_static.static", name="any")
assert uri == app.url_for("test_bp_static.static", filename="any")
assert uri == app.url_for(
"test_bp_static.static", name="any", filename="any"
)
request, response = app.test_client.get(uri)
assert response.status == 200
@ -440,6 +440,7 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
@pytest.mark.parametrize("file_name", ["test.file", "decode me.txt"])
def test_static_content_range_error(app, file_name, static_file_directory):
app = Sanic("base")
app.static(
"/testing.file",
get_file_path(static_file_directory, file_name),
@ -475,11 +476,7 @@ def test_static_content_range_error(app, file_name, static_file_directory):
"static", name="test_bp_static.static", filename="any"
)
assert uri == app.url_for("test_bp_static.static")
assert uri == app.url_for("test_bp_static.static", name="any")
assert uri == app.url_for("test_bp_static.static", filename="any")
assert uri == app.url_for(
"test_bp_static.static", name="any", filename="any"
)
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 416

View File

@ -1,7 +1,14 @@
import pytest
from sanic_routing.exceptions import RouteExists
from sanic import Sanic
from sanic.response import text
def test_vhosts(app):
def test_vhosts():
app = Sanic("app")
@app.route("/", host="example.com")
async def handler1(request):
return text("You're at example.com!")
@ -38,6 +45,8 @@ def test_vhosts_with_defaults(app):
async def handler1(request):
return text("Hello, world!")
with pytest.raises(RouteExists):
@app.route("/")
async def handler2(request):
return text("default")
@ -45,6 +54,3 @@ def test_vhosts_with_defaults(app):
headers = {"Host": "hello.com"}
request, response = app.test_client.get("/", headers=headers)
assert response.text == "Hello, world!"
request, response = app.test_client.get("/")
assert response.text == "default"

View File

@ -45,9 +45,9 @@ def test_unexisting_methods(app):
app.add_route(DummyView.as_view(), "/")
request, response = app.test_client.get("/")
assert response.text == "I am get method"
assert response.body == b"I am get method"
request, response = app.test_client.post("/")
assert "Method POST not allowed for URL /" in response.text
assert b"Method POST not allowed for URL /" in response.body
def test_argument_methods(app):
@ -215,17 +215,18 @@ def test_composition_view_runs_methods_as_expected(app, method):
if method in ["GET", "POST", "PUT"]:
request, response = getattr(app.test_client, method.lower())("/")
assert response.status == 200
assert response.text == "first method"
response = view(request)
assert response.body.decode() == "first method"
# response = view(request)
# assert response.body.decode() == "first method"
if method in ["DELETE", "PATCH"]:
request, response = getattr(app.test_client, method.lower())("/")
assert response.text == "second method"
# if method in ["DELETE", "PATCH"]:
# request, response = getattr(app.test_client, method.lower())("/")
# assert response.text == "second method"
response = view(request)
assert response.body.decode() == "second method"
# response = view(request)
# assert response.body.decode() == "second method"
@pytest.mark.parametrize("method", HTTP_METHODS)

View File

@ -9,6 +9,8 @@ from unittest import mock
import pytest
from sanic_testing.testing import ASGI_PORT as PORT
from sanic.app import Sanic
from sanic.worker import GunicornWorker
@ -17,7 +19,7 @@ from sanic.worker import GunicornWorker
def gunicorn_worker():
command = (
"gunicorn "
"--bind 127.0.0.1:1337 "
f"--bind 127.0.0.1:{PORT} "
"--worker-class sanic.worker.GunicornWorker "
"examples.simple_server:app"
)
@ -31,7 +33,7 @@ def gunicorn_worker():
def gunicorn_worker_with_access_logs():
command = (
"gunicorn "
"--bind 127.0.0.1:1338 "
f"--bind 127.0.0.1:{PORT + 1} "
"--worker-class sanic.worker.GunicornWorker "
"examples.simple_server:app"
)
@ -45,7 +47,7 @@ def gunicorn_worker_with_env_var():
command = (
'env SANIC_ACCESS_LOG="False" '
"gunicorn "
"--bind 127.0.0.1:1339 "
f"--bind 127.0.0.1:{PORT + 2} "
"--worker-class sanic.worker.GunicornWorker "
"--log-level info "
"examples.simple_server:app"
@ -56,7 +58,7 @@ def gunicorn_worker_with_env_var():
def test_gunicorn_worker(gunicorn_worker):
with urllib.request.urlopen("http://localhost:1337/") as f:
with urllib.request.urlopen(f"http://localhost:{PORT}/") as f:
res = json.loads(f.read(100).decode())
assert res["test"]
@ -65,7 +67,7 @@ def test_gunicorn_worker_no_logs(gunicorn_worker_with_env_var):
"""
if SANIC_ACCESS_LOG was set to False do not show access logs
"""
with urllib.request.urlopen("http://localhost:1339/") as _:
with urllib.request.urlopen(f"http://localhost:{PORT + 2}/") as _:
gunicorn_worker_with_env_var.kill()
assert not gunicorn_worker_with_env_var.stdout.read()
@ -74,7 +76,7 @@ def test_gunicorn_worker_with_logs(gunicorn_worker_with_access_logs):
"""
default - show access logs
"""
with urllib.request.urlopen("http://localhost:1338/") as _:
with urllib.request.urlopen(f"http://localhost:{PORT + 1}/") as _:
gunicorn_worker_with_access_logs.kill()
assert (
b"(sanic.access)[INFO][127.0.0.1"

View File

@ -7,7 +7,7 @@ setenv =
{py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UJSON=1
{py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UVLOOP=1
deps =
sanic-testing==0.1.2
sanic-testing
coverage==5.3
pytest==5.2.1
pytest-cov
@ -35,7 +35,7 @@ deps =
commands =
flake8 sanic
black --config ./.black.toml --check --verbose sanic/
isort --check-only sanic
isort --check-only sanic --profile=black
[testenv:type-checking]
deps =