Merge branch 'master' into master

This commit is contained in:
Eli Uriegas 2017-02-08 19:37:32 -06:00 committed by GitHub
commit eb059183f7
22 changed files with 808 additions and 184 deletions

View File

@ -158,3 +158,22 @@ app.blueprint(blueprint_v2)
app.run(host='0.0.0.0', port=8000, debug=True) app.run(host='0.0.0.0', port=8000, debug=True)
``` ```
## URL Building with `url_for`
If you wish to generate a URL for a route inside of a blueprint, remember that the endpoint name
takes the format `<blueprint_name>.<handler_name>`. For example:
```
@blueprint_v1.route('/')
async def root(request):
url = app.url_for('v1.post_handler', post_id=5)
return redirect(url)
@blueprint_v1.route('/post/<post_id>')
async def post_handler(request, post_id):
return text('Post {} in Blueprint V1'.format(post_id))
```

View File

@ -67,7 +67,7 @@ app.add_route(NameView.as_view(), '/<name>')
If you want to add any decorators to the class, you can set the `decorators` If you want to add any decorators to the class, you can set the `decorators`
class variable. These will be applied to the class when `as_view` is called. class variable. These will be applied to the class when `as_view` is called.
``` ```python
class ViewWithDecorator(HTTPMethodView): class ViewWithDecorator(HTTPMethodView):
decorators = [some_decorator_here] decorators = [some_decorator_here]
@ -77,6 +77,27 @@ class ViewWithDecorator(HTTPMethodView):
app.add_route(ViewWithDecorator.as_view(), '/url') app.add_route(ViewWithDecorator.as_view(), '/url')
``` ```
#### URL Building
If you wish to build a URL for an HTTPMethodView, remember that the class name will be the endpoint
that you will pass into `url_for`. For example:
```python
@app.route('/')
def index(request):
url = app.url_for('SpecialClassView')
return redirect(url)
class SpecialClassView(HTTPMethodView):
def get(self, request):
return text('Hello from the Special Class View!')
app.add_route(SpecialClassView.as_view(), '/special_class_view')
```
## Using CompositionView ## Using CompositionView
As an alternative to the `HTTPMethodView`, you can use `CompositionView` to As an alternative to the `HTTPMethodView`, you can use `CompositionView` to
@ -106,3 +127,5 @@ view.add(['POST', 'PUT'], lambda request: text('I am a post/put method'))
# Use the new view to handle requests to the base URL # Use the new view to handle requests to the base URL
app.add_route(view, '/') app.add_route(view, '/')
``` ```
Note: currently you cannot build a URL for a CompositionView using `url_for`.

View File

@ -6,3 +6,4 @@ A list of Sanic extensions created by the community.
Allows using redis, memcache or an in memory store. Allows using redis, memcache or an in memory store.
- [CORS](https://github.com/ashleysommer/sanic-cors): A port of flask-cors. - [CORS](https://github.com/ashleysommer/sanic-cors): A port of flask-cors.
- [Jinja2](https://github.com/lixxu/sanic-jinja2): Support for Jinja2 template. - [Jinja2](https://github.com/lixxu/sanic-jinja2): Support for Jinja2 template.
- [OpenAPI/Swagger](https://github.com/channelcat/sanic-openapi): OpenAPI support, plus a Swagger UI.

View File

@ -64,23 +64,37 @@ async def folder_handler(request, folder_id):
## HTTP request types ## HTTP request types
By default, a route defined on a URL will be used for all requests to that URL. By default, a route defined on a URL will be avaialble for only GET requests to that URL.
However, the `@app.route` decorator accepts an optional parameter, `methods`, However, the `@app.route` decorator accepts an optional parameter, `methods`,
which restricts the handler function to the HTTP methods in the given list. whicl allows the handler function to work with any of the HTTP methods in the list.
```python ```python
from sanic.response import text from sanic.response import text
@app.route('/post') @app.route('/post', methods=['POST'])
async def post_handler(request, methods=['POST']): async def post_handler(request):
return text('POST request - {}'.format(request.json)) return text('POST request - {}'.format(request.json))
@app.route('/get') @app.route('/get', methods=['GET'])
async def GET_handler(request, methods=['GET']): async def get_handler(request):
return text('GET request - {}'.format(request.args)) return text('GET request - {}'.format(request.args))
``` ```
There are also shorthand method decorators:
```python
from sanic.response import text
@app.post('/post')
async def post_handler(request):
return text('POST request - {}'.format(request.json))
@app.get('/get')
async def get_handler(request):
return text('GET request - {}'.format(request.args))
```
## The `add_route` method ## The `add_route` method
As we have seen, routes are often specified using the `@app.route` decorator. As we have seen, routes are often specified using the `@app.route` decorator.
@ -105,3 +119,35 @@ app.add_route(handler1, '/test')
app.add_route(handler2, '/folder/<name>') app.add_route(handler2, '/folder/<name>')
app.add_route(person_handler2, '/person/<name:[A-z]>', methods=['GET']) app.add_route(person_handler2, '/person/<name:[A-z]>', methods=['GET'])
``` ```
## URL building with `url_for`
Sanic provides a `url_for` method, to generate URLs based on the handler method name. This is useful if you want to avoid hardcoding url paths into your app; instead, you can just reference the handler name. For example:
```python
@app.route('/')
async def index(request):
# generate a URL for the endpoint `post_handler`
url = app.url_for('post_handler', post_id=5)
# the URL is `/posts/5`, redirect to it
return redirect(url)
@app.route('/posts/<post_id>')
async def post_handler(request, post_id):
return text('Post - {}'.format(post_id))
```
Other things to keep in mind when using `url_for`:
- Keyword arguments passed to `url_for` that are not request parameters will be included in the URL's query string. For example:
```python
url = app.url_for('post_handler', post_id=5, arg_one='one', arg_two='two')
# /posts/5?arg_one=one&arg_two=two
```
- All valid parameters must be passed to `url_for` to build a URL. If a parameter is not supplied, or if a parameter does not match the specified type, a `URLBuildError` will be thrown.

View File

@ -17,3 +17,5 @@ app.static('/the_best.png', '/home/ubuntu/test.png')
app.run(host="0.0.0.0", port=8000) app.run(host="0.0.0.0", port=8000)
``` ```
Note: currently you cannot build a URL for a static file using `url_for`.

View File

@ -3,6 +3,7 @@ from sanic.response import json
from multiprocessing import Event from multiprocessing import Event
from signal import signal, SIGINT from signal import signal, SIGINT
import asyncio import asyncio
import uvloop
app = Sanic(__name__) app = Sanic(__name__)
@ -10,10 +11,11 @@ app = Sanic(__name__)
async def test(request): async def test(request):
return json({"answer": "42"}) return json({"answer": "42"})
asyncio.set_event_loop(uvloop.new_event_loop())
server = app.create_server(host="0.0.0.0", port=8001) server = app.create_server(host="0.0.0.0", port=8001)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
task = asyncio.ensure_future(server) task = asyncio.ensure_future(server)
signal(SIGINT, lambda s, f: loop.close()) signal(SIGINT, lambda s, f: loop.stop())
try: try:
loop.run_forever() loop.run_forever()
except: except:

View File

@ -1,18 +1,18 @@
httptools
ujson
uvloop
aiohttp
aiocache aiocache
pytest
coverage
tox
gunicorn
bottle
kyoukai
falcon
tornado
aiofiles aiofiles
aiohttp
beautifulsoup4
bottle
coverage
falcon
gunicorn
httptools
kyoukai
pytest
recommonmark
sphinx sphinx
sphinx_rtd_theme sphinx_rtd_theme
recommonmark tornado
beautifulsoup4 tox
ujson
uvloop

View File

@ -1,4 +1,4 @@
aiofiles
httptools httptools
ujson ujson
uvloop uvloop
aiofiles

View File

@ -1,6 +1,6 @@
from .sanic import Sanic from .sanic import Sanic
from .blueprints import Blueprint from .blueprints import Blueprint
__version__ = '0.3.0' __version__ = '0.3.1'
__all__ = ['Sanic', 'Blueprint'] __all__ = ['Sanic', 'Blueprint']

View File

@ -35,6 +35,9 @@ class Blueprint:
# Routes # Routes
for future in self.routes: for future in self.routes:
# attach the blueprint name to the handler so that it can be
# prefixed properly in the router
future.handler.__blueprintname__ = self.name
# Prepend the blueprint URI prefix if available # Prepend the blueprint URI prefix if available
uri = url_prefix + future.uri if url_prefix else future.uri uri = url_prefix + future.uri if url_prefix else future.uri
app.route( app.route(

View File

@ -116,6 +116,10 @@ class ServerError(SanicException):
status_code = 500 status_code = 500
class URLBuildError(SanicException):
status_code = 500
class FileNotFound(NotFound): class FileNotFound(NotFound):
status_code = 404 status_code = 404
@ -150,3 +154,87 @@ class ContentRangeError(SanicException):
class InvalidRangeType(ContentRangeError): class InvalidRangeType(ContentRangeError):
pass pass
class Handler:
handlers = None
cached_handlers = None
_missing = object()
def __init__(self):
self.handlers = []
self.cached_handlers = {}
self.debug = False
def _render_traceback_html(self, exception, request):
exc_type, exc_value, tb = sys.exc_info()
frames = extract_tb(tb)
frame_html = []
for frame in frames:
frame_html.append(TRACEBACK_LINE_HTML.format(frame))
return TRACEBACK_WRAPPER_HTML.format(
style=TRACEBACK_STYLE,
exc_name=exc_type.__name__,
exc_value=exc_value,
frame_html=''.join(frame_html),
uri=request.url)
def add(self, exception, handler):
self.handlers.append((exception, handler))
def lookup(self, exception):
handler = self.cached_handlers.get(exception, self._missing)
if handler is self._missing:
for exception_class, handler in self.handlers:
if isinstance(exception, exception_class):
self.cached_handlers[type(exception)] = handler
return handler
self.cached_handlers[type(exception)] = None
handler = None
return handler
def response(self, request, exception):
"""
Fetches and executes an exception handler and returns a response object
:param request: Request
:param exception: Exception to handle
:return: Response object
"""
handler = self.lookup(exception)
try:
response = handler and handler(
request=request, exception=exception)
if response is None:
response = self.default(request=request, exception=exception)
except:
log.error(format_exc())
if self.debug:
response_message = (
'Exception raised in exception handler "{}" '
'for uri: "{}"\n{}').format(
handler.__name__, request.url, format_exc())
log.error(response_message)
return text(response_message, 500)
else:
return text('An error occurred while handling an error', 500)
return response
def default(self, request, exception):
log.error(format_exc())
if isinstance(exception, SanicException):
return text(
'Error: {}'.format(exception),
status=getattr(exception, 'status_code', 500))
elif self.debug:
html_output = self._render_traceback_html(exception, request)
response_message = (
'Exception occurred while handling uri: "{}"\n{}'.format(
request.url, format_exc()))
log.error(response_message)
return html(html_output, status=500)
else:
return html(INTERNAL_SERVER_ERROR_HTML, status=500)

View File

@ -4,7 +4,9 @@ from functools import lru_cache
from .exceptions import NotFound, InvalidUsage from .exceptions import NotFound, InvalidUsage
from .views import CompositionView from .views import CompositionView
Route = namedtuple('Route', ['handler', 'methods', 'pattern', 'parameters']) Route = namedtuple(
'Route',
['handler', 'methods', 'pattern', 'parameters', 'name'])
Parameter = namedtuple('Parameter', ['name', 'cast']) Parameter = namedtuple('Parameter', ['name', 'cast'])
REGEX_TYPES = { REGEX_TYPES = {
@ -59,6 +61,7 @@ class Router:
routes_static = None routes_static = None
routes_dynamic = None routes_dynamic = None
routes_always_check = None routes_always_check = None
parameter_pattern = re.compile(r'<(.+?)>')
def __init__(self): def __init__(self):
self.routes_all = {} self.routes_all = {}
@ -67,6 +70,29 @@ class Router:
self.routes_always_check = [] self.routes_always_check = []
self.hosts = None self.hosts = None
def parse_parameter_string(self, parameter_string):
"""
Parse a parameter string into its constituent name, type, and pattern
For example:
`parse_parameter_string('<param_one:[A-z]')` ->
('param_one', str, '[A-z]')
:param parameter_string: String to parse
:return: tuple containing
(parameter_name, parameter_type, parameter_pattern)
"""
# We could receive NAME or NAME:PATTERN
name = parameter_string
pattern = 'string'
if ':' in parameter_string:
name, pattern = parameter_string.split(':', 1)
default = (str, pattern)
# Pull from pre-configured types
_type, pattern = REGEX_TYPES.get(pattern, default)
return name, _type, pattern
def add(self, uri, methods, handler, host=None): def add(self, uri, methods, handler, host=None):
""" """
Adds a handler to the route list Adds a handler to the route list
@ -104,16 +130,11 @@ class Router:
properties = {"unhashable": None} properties = {"unhashable": None}
def add_parameter(match): def add_parameter(match):
# We could receive NAME or NAME:PATTERN
name = match.group(1) name = match.group(1)
pattern = 'string' name, _type, pattern = self.parse_parameter_string(name)
if ':' in name:
name, pattern = name.split(':', 1)
default = (str, pattern) parameter = Parameter(
# Pull from pre-configured types name=name, cast=_type)
_type, pattern = REGEX_TYPES.get(pattern, default)
parameter = Parameter(name=name, cast=_type)
parameters.append(parameter) parameters.append(parameter)
# Mark the whole route as unhashable if it has the hash key in it # Mark the whole route as unhashable if it has the hash key in it
@ -125,7 +146,7 @@ class Router:
return '({})'.format(pattern) return '({})'.format(pattern)
pattern_string = re.sub(r'<(.+?)>', add_parameter, uri) pattern_string = re.sub(self.parameter_pattern, add_parameter, uri)
pattern = re.compile(r'^{}$'.format(pattern_string)) pattern = re.compile(r'^{}$'.format(pattern_string))
def merge_route(route, methods, handler): def merge_route(route, methods, handler):
@ -169,9 +190,17 @@ class Router:
if route: if route:
route = merge_route(route, methods, handler) route = merge_route(route, methods, handler)
else: else:
# prefix the handler name with the blueprint name
# if available
if hasattr(handler, '__blueprintname__'):
handler_name = '{}.{}'.format(
handler.__blueprintname__, handler.__name__)
else:
handler_name = getattr(handler, '__name__', None)
route = Route( route = Route(
handler=handler, methods=methods, pattern=pattern, handler=handler, methods=methods, pattern=pattern,
parameters=parameters) parameters=parameters, name=handler_name)
self.routes_all[uri] = route self.routes_all[uri] = route
if properties['unhashable']: if properties['unhashable']:
@ -208,6 +237,23 @@ class Router:
if clean_cache: if clean_cache:
self._get.cache_clear() self._get.cache_clear()
@lru_cache(maxsize=ROUTER_CACHE_SIZE)
def find_route_by_view_name(self, view_name):
"""
Find a route in the router based on the specified view name.
:param view_name: string of view name to search by
:return: tuple containing (uri, Route)
"""
if not view_name:
return (None, None)
for uri, route in self.routes_all.items():
if route.name == view_name:
return uri, route
return (None, None)
def get(self, request): def get(self, request):
""" """
Gets a request handler based on the URL of the request, or raises an Gets a request handler based on the URL of the request, or raises an

View File

@ -1,15 +1,16 @@
import logging import logging
import re
import warnings import warnings
from asyncio import get_event_loop from asyncio import get_event_loop
from collections import deque from collections import deque
from functools import partial from functools import partial
from inspect import isawaitable, stack, getmodulename from inspect import isawaitable, stack, getmodulename
from traceback import format_exc from traceback import format_exc
from urllib.parse import urlencode, urlunparse
from .config import Config from .config import Config
from .constants import HTTP_METHODS from .constants import HTTP_METHODS
from .exceptions import ServerError from .exceptions import Handler, ServerError, URLBuildError
from .handlers import ErrorHandler
from .log import log from .log import log
from .response import HTTPResponse from .response import HTTPResponse
from .router import Router from .router import Router
@ -194,6 +195,89 @@ class Sanic:
DeprecationWarning) DeprecationWarning)
return self.blueprint(*args, **kwargs) return self.blueprint(*args, **kwargs)
def url_for(self, view_name: str, **kwargs):
"""Builds a URL based on a view name and the values provided.
In order to build a URL, all request parameters must be supplied as
keyword arguments, and each parameter must pass the test for the
specified parameter type. If these conditions are not met, a
`URLBuildError` will be thrown.
Keyword arguments that are not request parameters will be included in
the output URL's query string.
:param view_name: A string referencing the view name
:param **kwargs: keys and values that are used to build request
parameters and query string arguments.
:return: the built URL
Raises:
URLBuildError
"""
# find the route by the supplied view name
uri, route = self.router.find_route_by_view_name(view_name)
if not uri or not route:
raise URLBuildError(
'Endpoint with name `{}` was not found'.format(
view_name))
out = uri
# find all the parameters we will need to build in the URL
matched_params = re.findall(
self.router.parameter_pattern, uri)
for match in matched_params:
name, _type, pattern = self.router.parse_parameter_string(
match)
# we only want to match against each individual parameter
specific_pattern = '^{}$'.format(pattern)
supplied_param = None
if kwargs.get(name):
supplied_param = kwargs.get(name)
del kwargs[name]
else:
raise URLBuildError(
'Required parameter `{}` was not passed to url_for'.format(
name))
supplied_param = str(supplied_param)
# determine if the parameter supplied by the caller passes the test
# in the URL
passes_pattern = re.match(specific_pattern, supplied_param)
if not passes_pattern:
if _type != str:
msg = (
'Value "{}" for parameter `{}` does not '
'match pattern for type `{}`: {}'.format(
supplied_param, name, _type.__name__, pattern))
else:
msg = (
'Value "{}" for parameter `{}` '
'does not satisfy pattern {}'.format(
supplied_param, name, pattern))
raise URLBuildError(msg)
# replace the parameter in the URL with the supplied value
replacement_regex = '(<{}.*?>)'.format(name)
out = re.sub(
replacement_regex, supplied_param, out)
# parse the remainder of the keyword arguments into a querystring
if kwargs:
query_string = urlencode(kwargs)
out = urlunparse((
'', '', out,
'', query_string, ''
))
return out
# -------------------------------------------------------------------- # # -------------------------------------------------------------------- #
# Request Handling # Request Handling
# -------------------------------------------------------------------- # # -------------------------------------------------------------------- #
@ -364,18 +448,18 @@ class Sanic:
Helper function used by `run` and `create_server`. Helper function used by `run` and `create_server`.
""" """
self.error_handler.debug = debug
self.debug = debug
self.loop = loop = get_event_loop()
if loop is not None: if loop is not None:
if self.debug: if debug:
warnings.simplefilter('default') warnings.simplefilter('default')
warnings.warn("Passing a loop will be deprecated in version" warnings.warn("Passing a loop will be deprecated in version"
" 0.4.0 https://github.com/channelcat/sanic/" " 0.4.0 https://github.com/channelcat/sanic/"
"pull/335 has more information.", "pull/335 has more information.",
DeprecationWarning) DeprecationWarning)
self.error_handler.debug = debug
self.debug = debug
self.loop = loop = get_event_loop()
server_settings = { server_settings = {
'protocol': protocol, 'protocol': protocol,
'host': host, 'host': host,
@ -417,6 +501,7 @@ class Sanic:
if debug: if debug:
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
if self.config.LOGO is not None:
log.debug(self.config.LOGO) log.debug(self.config.LOGO)
if run_async: if run_async:

View File

@ -297,6 +297,7 @@ def serve(host, port, request_handler, error_handler, before_start=None,
:param protocol: Subclass of asyncio protocol class :param protocol: Subclass of asyncio protocol class
:return: Nothing :return: Nothing
""" """
if not run_async:
loop = async_loop.new_event_loop() loop = async_loop.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)

View File

@ -19,19 +19,20 @@ async def local_request(method, uri, cookies=None, *args, **kwargs):
def sanic_endpoint_test(app, method='get', uri='/', gather_request=True, def sanic_endpoint_test(app, method='get', uri='/', gather_request=True,
debug=False, server_kwargs={}, debug=False, server_kwargs={},
*request_args, **request_kwargs): *request_args, **request_kwargs):
results = [] results = [None, None]
exceptions = [] exceptions = []
if gather_request: if gather_request:
def _collect_request(request): def _collect_request(request):
results.append(request) if results[0] is None:
results[0] = request
app.request_middleware.appendleft(_collect_request) app.request_middleware.appendleft(_collect_request)
async def _collect_response(sanic, loop): async def _collect_response(sanic, loop):
try: try:
response = await local_request(method, uri, *request_args, response = await local_request(method, uri, *request_args,
**request_kwargs) **request_kwargs)
results.append(response) results[-1] = response
except Exception as e: except Exception as e:
exceptions.append(e) exceptions.append(e)
app.stop() app.stop()
@ -52,7 +53,7 @@ def sanic_endpoint_test(app, method='get', uri='/', gather_request=True,
results)) results))
else: else:
try: try:
return results[0] return results[-1]
except: except:
raise ValueError( raise ValueError(
"Request object expected, got ({})".format(results)) "Request object expected, got ({})".format(results))

View File

@ -64,6 +64,7 @@ class HTTPMethodView:
view.view_class = cls view.view_class = cls
view.__doc__ = cls.__doc__ view.__doc__ = cls.__doc__
view.__module__ = cls.__module__ view.__module__ = cls.__module__
view.__name__ = cls.__name__
return view return view

View File

@ -28,6 +28,13 @@ def handler_4(request):
return text(foo) return text(foo)
@exception_handler_app.route('/5')
def handler_5(request):
class CustomServerError(ServerError):
pass
raise CustomServerError('Custom server error')
@exception_handler_app.exception(NotFound, ServerError) @exception_handler_app.exception(NotFound, ServerError)
def handler_exception(request, exception): def handler_exception(request, exception):
return text("OK") return text("OK")
@ -71,3 +78,8 @@ def test_html_traceback_output_in_debug_mode():
assert ( assert (
"NameError: name 'bar' " "NameError: name 'bar' "
"is not defined while handling uri /4") == summary_text "is not defined while handling uri /4") == summary_text
def test_inherited_exception_handler():
request, response = sanic_endpoint_test(exception_handler_app, uri='/5')
assert response.status == 200

96
tests/test_redirect.py Normal file
View File

@ -0,0 +1,96 @@
import pytest
from sanic import Sanic
from sanic.response import text, redirect
from sanic.utils import sanic_endpoint_test
@pytest.fixture
def redirect_app():
app = Sanic('test_redirection')
@app.route('/redirect_init')
async def redirect_init(request):
return redirect("/redirect_target")
@app.route('/redirect_init_with_301')
async def redirect_init_with_301(request):
return redirect("/redirect_target", status=301)
@app.route('/redirect_target')
async def redirect_target(request):
return text('OK')
@app.route('/1')
def handler(request):
return redirect('/2')
@app.route('/2')
def handler(request):
return redirect('/3')
@app.route('/3')
def handler(request):
return text('OK')
return app
def test_redirect_default_302(redirect_app):
"""
We expect a 302 default status code and the headers to be set.
"""
request, response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init",
allow_redirects=False)
assert response.status == 302
assert response.headers["Location"] == "/redirect_target"
assert response.headers["Content-Type"] == 'text/html; charset=utf-8'
def test_redirect_headers_none(redirect_app):
request, response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init",
headers=None,
allow_redirects=False)
assert response.status == 302
assert response.headers["Location"] == "/redirect_target"
def test_redirect_with_301(redirect_app):
"""
Test redirection with a different status code.
"""
request, response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init_with_301",
allow_redirects=False)
assert response.status == 301
assert response.headers["Location"] == "/redirect_target"
def test_get_then_redirect_follow_redirect(redirect_app):
"""
With `allow_redirects` we expect a 200.
"""
response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init", gather_request=False,
allow_redirects=True)
assert response.status == 200
assert response.text == 'OK'
def test_chained_redirect(redirect_app):
"""Test sanic_endpoint_test is working for redirection"""
request, response = sanic_endpoint_test(redirect_app, uri='/1')
assert request.url.endswith('/1')
assert response.status == 200
assert response.text == 'OK'
assert response.url.endswith('/3')

View File

@ -193,73 +193,3 @@ def test_post_form_multipart_form_data():
request, response = sanic_endpoint_test(app, data=payload, headers=headers) request, response = sanic_endpoint_test(app, data=payload, headers=headers)
assert request.form.get('test') == 'OK' assert request.form.get('test') == 'OK'
@pytest.fixture
def redirect_app():
app = Sanic('test_redirection')
@app.route('/redirect_init')
async def redirect_init(request):
return redirect("/redirect_target")
@app.route('/redirect_init_with_301')
async def redirect_init_with_301(request):
return redirect("/redirect_target", status=301)
@app.route('/redirect_target')
async def redirect_target(request):
return text('OK')
return app
def test_redirect_default_302(redirect_app):
"""
We expect a 302 default status code and the headers to be set.
"""
request, response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init",
allow_redirects=False)
assert response.status == 302
assert response.headers["Location"] == "/redirect_target"
assert response.headers["Content-Type"] == 'text/html; charset=utf-8'
def test_redirect_headers_none(redirect_app):
request, response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init",
headers=None,
allow_redirects=False)
assert response.status == 302
assert response.headers["Location"] == "/redirect_target"
def test_redirect_with_301(redirect_app):
"""
Test redirection with a different status code.
"""
request, response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init_with_301",
allow_redirects=False)
assert response.status == 301
assert response.headers["Location"] == "/redirect_target"
def test_get_then_redirect_follow_redirect(redirect_app):
"""
With `allow_redirects` we expect a 200.
"""
response = sanic_endpoint_test(
redirect_app, method="get",
uri="/redirect_init", gather_request=False,
allow_redirects=True)
assert response.status == 200
assert response.text == 'OK'

View File

@ -16,67 +16,61 @@ def static_file_directory():
return static_directory return static_directory
@pytest.fixture(scope='module') def get_file_path(static_file_directory, file_name):
def static_file_path(static_file_directory): return os.path.join(static_file_directory, file_name)
"""The path to the static file that we want to serve"""
return os.path.join(static_file_directory, 'test.file')
@pytest.fixture(scope='module') def get_file_content(static_file_directory, file_name):
def static_file_content(static_file_path):
"""The content of the static file to check""" """The content of the static file to check"""
with open(static_file_path, 'rb') as file: with open(get_file_path(static_file_directory, file_name), 'rb') as file:
return file.read() return file.read()
def test_static_file(static_file_path, static_file_content): @pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_file(static_file_directory, file_name):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/testing.file', static_file_path) app.static(
'/testing.file', get_file_path(static_file_directory, file_name))
request, response = sanic_endpoint_test(app, uri='/testing.file') request, response = sanic_endpoint_test(app, uri='/testing.file')
assert response.status == 200 assert response.status == 200
assert response.body == static_file_content assert response.body == get_file_content(static_file_directory, file_name)
def test_static_directory( @pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
static_file_directory, static_file_path, static_file_content): @pytest.mark.parametrize('base_uri', ['/static', '', '/dir'])
def test_static_directory(file_name, base_uri, static_file_directory):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/dir', static_file_directory) app.static(base_uri, static_file_directory)
request, response = sanic_endpoint_test(app, uri='/dir/test.file') request, response = sanic_endpoint_test(
app, uri='{}/{}'.format(base_uri, file_name))
assert response.status == 200 assert response.status == 200
assert response.body == static_file_content assert response.body == get_file_content(static_file_directory, file_name)
def test_static_url_decode_file(static_file_directory):
decode_me_path = os.path.join(static_file_directory, 'decode me.txt')
with open(decode_me_path, 'rb') as file:
decode_me_contents = file.read()
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_head_request(
file_name, static_file_content, static_file_directory):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/dir', static_file_directory) app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
request, response = sanic_endpoint_test(app, uri='/dir/decode me.txt') use_content_range=True)
assert response.status == 200
assert response.body == decode_me_contents
def test_static_head_request(static_file_path, static_file_content):
app = Sanic('test_static')
app.static('/testing.file', static_file_path, use_content_range=True)
request, response = sanic_endpoint_test( request, response = sanic_endpoint_test(
app, uri='/testing.file', method='head') app, uri='/testing.file', method='head')
assert response.status == 200 assert response.status == 200
assert 'Accept-Ranges' in response.headers assert 'Accept-Ranges' in response.headers
assert 'Content-Length' in response.headers assert 'Content-Length' in response.headers
assert int(response.headers['Content-Length']) == len(static_file_content) assert int(response.headers['Content-Length']) == len(get_file_content(static_file_directory, file_name))
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_content_range_correct(static_file_path, static_file_content): def test_static_content_range_correct(
file_name, static_file_content, static_file_directory):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/testing.file', static_file_path, use_content_range=True) app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = { headers = {
'Range': 'bytes=12-19' 'Range': 'bytes=12-19'
@ -86,14 +80,17 @@ def test_static_content_range_correct(static_file_path, static_file_content):
assert response.status == 200 assert response.status == 200
assert 'Content-Length' in response.headers assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers assert 'Content-Range' in response.headers
static_content = bytes(static_file_content)[12:19] static_content = bytes(get_file_content(static_file_directory, file_name))[12:19]
assert int(response.headers['Content-Length']) == len(static_content) assert int(response.headers['Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert response.body == static_content assert response.body == get_file_content(static_file_directory, file_name)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_content_range_front(static_file_path, static_file_content): def test_static_content_range_front(
file_name, static_file_content, static_file_directory):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/testing.file', static_file_path, use_content_range=True) app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = { headers = {
'Range': 'bytes=12-' 'Range': 'bytes=12-'
@ -103,14 +100,18 @@ def test_static_content_range_front(static_file_path, static_file_content):
assert response.status == 200 assert response.status == 200
assert 'Content-Length' in response.headers assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers assert 'Content-Range' in response.headers
static_content = bytes(static_file_content)[12:] static_content = bytes(get_file_content(static_file_directory, file_name))[12:]
assert int(response.headers['Content-Length']) == len(static_content) assert int(response.headers['Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert response.body == static_content assert response.body == get_file_content(static_file_directory, file_name)
def test_static_content_range_back(static_file_path, static_file_content): @pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_content_range_back(
file_name, static_file_content, static_file_directory):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/testing.file', static_file_path, use_content_range=True) app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = { headers = {
'Range': 'bytes=-12' 'Range': 'bytes=-12'
@ -120,26 +121,31 @@ def test_static_content_range_back(static_file_path, static_file_content):
assert response.status == 200 assert response.status == 200
assert 'Content-Length' in response.headers assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers assert 'Content-Range' in response.headers
static_content = bytes(static_file_content)[-12:] static_content = bytes(get_file_content(static_file_directory, file_name))[-12:]
assert int(response.headers['Content-Length']) == len(static_content) assert int(response.headers['Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert response.body == static_content assert response.body == get_file_content(static_file_directory, file_name)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_content_range_empty(static_file_path, static_file_content): def test_static_content_range_empty(
file_name, static_file_content, static_file_directory):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/testing.file', static_file_path, use_content_range=True) app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
request, response = sanic_endpoint_test(app, uri='/testing.file') request, response = sanic_endpoint_test(app, uri='/testing.file')
assert response.status == 200 assert response.status == 200
assert 'Content-Length' in response.headers assert 'Content-Length' in response.headers
assert 'Content-Range' not in response.headers assert 'Content-Range' not in response.headers
assert int(response.headers['Content-Length']) == len(static_file_content) assert int(response.headers['Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert response.body == bytes(static_file_content) assert response.body == bytes(get_file_content(static_file_directory, file_name))
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_static_content_range_error(static_file_path, static_file_content): def test_static_content_range_error(static_file_path, static_file_content):
app = Sanic('test_static') app = Sanic('test_static')
app.static('/testing.file', static_file_path, use_content_range=True) app.static(
'/testing.file', get_file_path(static_file_directory, file_name),
use_content_range=True)
headers = { headers = {
'Range': 'bytes=1-0' 'Range': 'bytes=1-0'

261
tests/test_url_building.py Normal file
View File

@ -0,0 +1,261 @@
import pytest as pytest
from urllib.parse import urlsplit, parse_qsl
from sanic import Sanic
from sanic.response import text
from sanic.views import HTTPMethodView
from sanic.blueprints import Blueprint
from sanic.utils import sanic_endpoint_test
from sanic.exceptions import URLBuildError
import string
def _generate_handlers_from_names(app, l):
for name in l:
# this is the easiest way to generate functions with dynamic names
exec('@app.route(name)\ndef {}(request):\n\treturn text("{}")'.format(
name, name))
@pytest.fixture
def simple_app():
app = Sanic('simple_app')
handler_names = list(string.ascii_letters)
_generate_handlers_from_names(app, handler_names)
return app
def test_simple_url_for_getting(simple_app):
for letter in string.ascii_letters:
url = simple_app.url_for(letter)
assert url == '/{}'.format(letter)
request, response = sanic_endpoint_test(
simple_app, uri=url)
assert response.status == 200
assert response.text == letter
def test_fails_if_endpoint_not_found():
app = Sanic('fail_url_build')
@app.route('/fail')
def fail():
return text('this should fail')
with pytest.raises(URLBuildError) as e:
app.url_for('passes')
assert str(e.value) == 'Endpoint with name `passes` was not found'
def test_fails_url_build_if_param_not_passed():
url = '/'
for letter in string.ascii_letters:
url += '<{}>/'.format(letter)
app = Sanic('fail_url_build')
@app.route(url)
def fail():
return text('this should fail')
fail_args = list(string.ascii_letters)
fail_args.pop()
fail_kwargs = {l: l for l in fail_args}
with pytest.raises(URLBuildError) as e:
app.url_for('fail', **fail_kwargs)
assert 'Required parameter `Z` was not passed to url_for' in str(e.value)
COMPLEX_PARAM_URL = (
'/<foo:int>/<four_letter_string:[A-z]{4}>/'
'<two_letter_string:[A-z]{2}>/<normal_string>/<some_number:number>')
PASSING_KWARGS = {
'foo': 4, 'four_letter_string': 'woof',
'two_letter_string': 'ba', 'normal_string': 'normal',
'some_number': '1.001'}
EXPECTED_BUILT_URL = '/4/woof/ba/normal/1.001'
def test_fails_with_int_message():
app = Sanic('fail_url_build')
@app.route(COMPLEX_PARAM_URL)
def fail():
return text('this should fail')
failing_kwargs = dict(PASSING_KWARGS)
failing_kwargs['foo'] = 'not_int'
with pytest.raises(URLBuildError) as e:
app.url_for('fail', **failing_kwargs)
expected_error = (
'Value "not_int" for parameter `foo` '
'does not match pattern for type `int`: \d+')
assert str(e.value) == expected_error
def test_fails_with_two_letter_string_message():
app = Sanic('fail_url_build')
@app.route(COMPLEX_PARAM_URL)
def fail():
return text('this should fail')
failing_kwargs = dict(PASSING_KWARGS)
failing_kwargs['two_letter_string'] = 'foobar'
with pytest.raises(URLBuildError) as e:
app.url_for('fail', **failing_kwargs)
expected_error = (
'Value "foobar" for parameter `two_letter_string` '
'does not satisfy pattern [A-z]{2}')
assert str(e.value) == expected_error
def test_fails_with_number_message():
app = Sanic('fail_url_build')
@app.route(COMPLEX_PARAM_URL)
def fail():
return text('this should fail')
failing_kwargs = dict(PASSING_KWARGS)
failing_kwargs['some_number'] = 'foo'
with pytest.raises(URLBuildError) as e:
app.url_for('fail', **failing_kwargs)
expected_error = (
'Value "foo" for parameter `some_number` '
'does not match pattern for type `float`: [0-9\\\\.]+')
assert str(e.value) == expected_error
def test_adds_other_supplied_values_as_query_string():
app = Sanic('passes')
@app.route(COMPLEX_PARAM_URL)
def passes():
return text('this should pass')
new_kwargs = dict(PASSING_KWARGS)
new_kwargs['added_value_one'] = 'one'
new_kwargs['added_value_two'] = 'two'
url = app.url_for('passes', **new_kwargs)
query = dict(parse_qsl(urlsplit(url).query))
assert query['added_value_one'] == 'one'
assert query['added_value_two'] == 'two'
@pytest.fixture
def blueprint_app():
app = Sanic('blueprints')
first_print = Blueprint('first', url_prefix='/first')
second_print = Blueprint('second', url_prefix='/second')
@first_print.route('/foo')
def foo():
return text('foo from first')
@first_print.route('/foo/<param>')
def foo_with_param(request, param):
return text(
'foo from first : {}'.format(param))
@second_print.route('/foo') # noqa
def foo():
return text('foo from second')
@second_print.route('/foo/<param>') # noqa
def foo_with_param(request, param):
return text(
'foo from second : {}'.format(param))
app.blueprint(first_print)
app.blueprint(second_print)
return app
def test_blueprints_are_named_correctly(blueprint_app):
first_url = blueprint_app.url_for('first.foo')
assert first_url == '/first/foo'
second_url = blueprint_app.url_for('second.foo')
assert second_url == '/second/foo'
def test_blueprints_work_with_params(blueprint_app):
first_url = blueprint_app.url_for('first.foo_with_param', param='bar')
assert first_url == '/first/foo/bar'
second_url = blueprint_app.url_for('second.foo_with_param', param='bar')
assert second_url == '/second/foo/bar'
@pytest.fixture
def methodview_app():
app = Sanic('methodview')
class ViewOne(HTTPMethodView):
def get(self, request):
return text('I am get method')
def post(self, request):
return text('I am post method')
def put(self, request):
return text('I am put method')
def patch(self, request):
return text('I am patch method')
def delete(self, request):
return text('I am delete method')
app.add_route(ViewOne.as_view('view_one'), '/view_one')
class ViewTwo(HTTPMethodView):
def get(self, request):
return text('I am get method')
def post(self, request):
return text('I am post method')
def put(self, request):
return text('I am put method')
def patch(self, request):
return text('I am patch method')
def delete(self, request):
return text('I am delete method')
app.add_route(ViewTwo.as_view(), '/view_two')
return app
def test_methodview_naming(methodview_app):
viewone_url = methodview_app.url_for('ViewOne')
viewtwo_url = methodview_app.url_for('ViewTwo')
assert viewone_url == '/view_one'
assert viewtwo_url == '/view_two'

View File

@ -1,16 +1,16 @@
[tox] [tox]
envlist = py35, py36, flake8 envlist = py35, py36, flake8
[travis]
[travis]
python = python =
3.5: py35, flake8 3.5: py35, flake8
3.6: py36, flake8 3.6: py36, flake8
[testenv]
[testenv]
deps = deps =
aiofiles
aiohttp aiohttp
pytest pytest
beautifulsoup4 beautifulsoup4
@ -18,6 +18,7 @@ deps =
commands = commands =
pytest tests {posargs} pytest tests {posargs}
[testenv:flake8] [testenv:flake8]
deps = deps =
flake8 flake8