Merge pull request #17 from channelcat/master

merge from upstream sanic
This commit is contained in:
7
2018-02-09 20:12:29 -08:00
committed by GitHub
26 changed files with 402 additions and 82 deletions

View File

@@ -1,6 +0,0 @@
FROM python:3.6
ADD . /app
WORKDIR /app
RUN pip install tox

View File

@@ -1,4 +1,4 @@
test:
find . -name "*.pyc" -delete
docker build -t sanic/test-image .
docker build -t sanic/test-image -f docker/Dockerfile .
docker run -t sanic/test-image tox

View File

@@ -21,19 +21,19 @@ Hello World Example
app = Sanic()
@app.route("/")
@app.route('/')
async def test(request):
return json({"hello": "world"})
return json({'hello': 'world'})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Installation
------------
- ``pip install sanic``
To install sanic without uvloop or json using bash, you can provide either or both of these environmental variables
To install sanic without uvloop or ujson using bash, you can provide either or both of these environmental variables
using any truthy string like `'y', 'yes', 't', 'true', 'on', '1'` and setting the NO_X to true will stop that features
installation.

28
docker/Dockerfile Normal file
View File

@@ -0,0 +1,28 @@
FROM alpine:3.7
RUN apk add --no-cache --update \
curl \
bash \
build-base \
ca-certificates \
git \
bzip2-dev \
linux-headers \
ncurses-dev \
openssl \
openssl-dev \
readline-dev \
sqlite-dev
RUN update-ca-certificates
RUN rm -rf /var/cache/apk/*
ENV PYENV_ROOT="/root/.pyenv"
ENV PATH="$PYENV_ROOT/bin:$PATH"
ADD . /app
WORKDIR /app
RUN /app/docker/bin/install_python.sh 3.5.4 3.6.4
ENTRYPOINT ["./docker/bin/entrypoint.sh"]

11
docker/bin/entrypoint.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
set -e
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
source /root/.pyenv/completions/pyenv.bash
pip install tox
exec $@

17
docker/bin/install_python.sh Executable file
View File

@@ -0,0 +1,17 @@
#!/bin/bash
set -e
export CFLAGS='-O2'
export EXTRA_CFLAGS="-DTHREAD_STACK_SIZE=0x100000"
curl -L https://raw.githubusercontent.com/pyenv/pyenv-installer/master/bin/pyenv-installer | bash
eval "$(pyenv init -)"
for ver in $@
do
pyenv install $ver
done
pyenv global $@
pip install --upgrade pip
pyenv rehash

View File

@@ -51,6 +51,73 @@ will look like:
[Route(handler=<function bp_root at 0x7f908382f9d8>, methods=None, pattern=re.compile('^/$'), parameters=[])]
```
## Blueprint groups and nesting
Blueprints may also be registered as part of a list or tuple, where the registrar will recursively cycle through any sub-sequences of blueprints and register them accordingly. The `Blueprint.group` method is provided to simplify this process, allowing a 'mock' backend directory structure mimicking what's seen from the front end. Consider this (quite contrived) example:
```
api/
├──content/
│ ├──authors.py
│ ├──static.py
│ └──__init__.py
├──info.py
└──__init__.py
app.py
```
Initialization of this app's blueprint hierarchy could go as follows:
```python
# api/content/authors.py
from sanic import Blueprint
authors = Blueprint('content_authors', url_prefix='/authors')
```
```python
# api/content/static.py
from sanic import Blueprint
static = Blueprint('content_static', url_prefix='/static')
```
```python
# api/content/__init__.py
from sanic import Blueprint
from .static import static
from .authors import authors
content = Blueprint.group(assets, authors, url_prefix='/content')
```
```python
# api/info.py
from sanic import Blueprint
info = Blueprint('info', url_prefix='/info')
```
```python
# api/__init__.py
from sanic import Blueprint
from .content import content
from .info import info
api = Blueprint.group(content, info, url_prefix='/api')
```
And registering these blueprints in `app.py` can now be done like so:
```python
# app.py
from sanic import Sanic
from .api import api
app = Sanic(__name__)
app.blueprint(api)
```
## Using blueprints
Blueprints have much the same functionality as an application instance.

View File

@@ -92,10 +92,27 @@ class ViewWithDecorator(HTTPMethodView):
def get(self, request, name):
return text('Hello I have a decorator')
def post(self, request, name):
return text("Hello I also have a decorator")
app.add_route(ViewWithDecorator.as_view(), '/url')
```
#### URL Building
But if you just want to decorate some functions and not all functions, you can do as follows:
```python
class ViewWithSomeDecorator(HTTPMethodView):
@staticmethod
@some_decorator_here
def get(request, name):
return text("Hello I have a decorator")
def post(self, request, name):
return text("Hello I don't have any decorators")
```
## URL Building
If you wish to build a URL for an HTTPMethodView, remember that the class name will be the endpoint
that you will pass into `url_for`. For example:

View File

@@ -19,6 +19,7 @@ A list of Sanic extensions created by the community.
`Babel` library
- [Dispatch](https://github.com/ashleysommer/sanic-dispatcher): A dispatcher inspired by `DispatcherMiddleware` in werkzeug. Can act as a Sanic-to-WSGI adapter.
- [Sanic-OAuth](https://github.com/Sniedes722/Sanic-OAuth): OAuth Library for connecting to & creating your own token providers.
- [sanic-oauth](https://gitlab.com/SirEdvin/sanic-oauth): OAuth Library with many provider and OAuth1/OAuth2 support.
- [Sanic-nginx-docker-example](https://github.com/itielshwartz/sanic-nginx-docker-example): Simple and easy to use example of Sanic behined nginx using docker-compose.
- [sanic-graphql](https://github.com/graphql-python/sanic-graphql): GraphQL integration with Sanic
- [sanic-prometheus](https://github.com/dkruchinin/sanic-prometheus): Prometheus metrics for Sanic

View File

@@ -73,6 +73,8 @@ The following variables are accessible as properties on `Request` objects:
- `headers` (dict) - A case-insensitive dictionary that contains the request headers.
- `method` (str) - HTTP method of the request (ie `GET`, `POST`).
- `ip` (str) - IP address of the requester.
- `port` (str) - Port address of the requester.

View File

@@ -5,7 +5,7 @@ beautifulsoup4
coverage
httptools
flake8
pytest
pytest==3.3.2
tox
ujson
uvloop

View File

@@ -5,7 +5,7 @@ import warnings
from asyncio import get_event_loop, ensure_future, CancelledError
from collections import deque, defaultdict
from functools import partial
from inspect import isawaitable, stack, getmodulename
from inspect import getmodulename, isawaitable, signature, stack
from traceback import format_exc
from urllib.parse import urlencode, urlunparse
from ssl import create_default_context, Purpose
@@ -25,7 +25,6 @@ from sanic.websocket import WebSocketProtocol, ConnectionClosed
class Sanic:
def __init__(self, name=None, router=None, error_handler=None,
load_env=True, request_class=None,
strict_slashes=False, log_config=None,
@@ -111,9 +110,11 @@ class Sanic:
:param event: event to listen to
"""
def decorator(listener):
self.listeners[event].append(listener)
return listener
return decorator
# Decorator
@@ -143,12 +144,20 @@ class Sanic:
strict_slashes = self.strict_slashes
def response(handler):
args = [key for key in signature(handler).parameters.keys()]
if args:
if stream:
handler.is_stream = stream
self.router.add(uri=uri, methods=methods, handler=handler,
host=host, strict_slashes=strict_slashes,
version=version, name=name)
return handler
else:
raise ValueError(
'Required parameter `request` missing'
'in the {0}() route?'.format(
handler.__name__))
return response
@@ -372,10 +381,14 @@ class Sanic:
def blueprint(self, blueprint, **options):
"""Register a blueprint on the application.
:param blueprint: Blueprint object
:param blueprint: Blueprint object or (list, tuple) thereof
:param options: option dictionary with blueprint defaults
:return: Nothing
"""
if isinstance(blueprint, (list, tuple)):
for item in blueprint:
self.blueprint(item, **options)
return
if blueprint.name in self.blueprints:
assert self.blueprints[blueprint.name] is blueprint, \
'A blueprint with the name "%s" is already registered. ' \
@@ -482,7 +495,7 @@ class Sanic:
specific_pattern = '^{}$'.format(pattern)
supplied_param = None
if kwargs.get(name):
if name in kwargs:
supplied_param = kwargs.get(name)
del kwargs[name]
else:

View File

@@ -14,7 +14,6 @@ FutureStatic = namedtuple('Route',
class Blueprint:
def __init__(self, name,
url_prefix=None,
host=None, version=None,
@@ -38,6 +37,27 @@ class Blueprint:
self.version = version
self.strict_slashes = strict_slashes
@staticmethod
def group(*blueprints, url_prefix=''):
"""Create a list of blueprints, optionally
grouping them under a general URL prefix.
:param blueprints: blueprints to be registered as a group
:param url_prefix: URL route to be prepended to all sub-prefixes
"""
def chain(nested):
"""itertools.chain() but leaves strings untouched"""
for i in nested:
if isinstance(i, (list, tuple)):
yield from chain(i)
else:
yield i
bps = []
for bp in chain(blueprints):
bp.url_prefix = url_prefix + bp.url_prefix
bps.append(bp)
return bps
def register(self, app, options):
"""Register the blueprint to the sanic app."""

View File

@@ -18,7 +18,7 @@ except ImportError:
json_loads = json.loads
from sanic.exceptions import InvalidUsage
from sanic.log import error_logger
from sanic.log import error_logger, logger
DEFAULT_HTTP_CONTENT_TYPE = "application/octet-stream"
@@ -284,7 +284,8 @@ def parse_multipart_form(body, boundary):
form_parts = body.split(boundary)
for form_part in form_parts[1:-1]:
file_name = None
file_type = None
content_type = 'text/plain'
content_charset = 'utf-8'
field_name = None
line_index = 2
line_end_index = 0
@@ -302,24 +303,30 @@ def parse_multipart_form(body, boundary):
form_line[colon_index + 2:])
if form_header_field == 'content-disposition':
if 'filename' in form_parameters:
file_name = form_parameters['filename']
file_name = form_parameters.get('filename')
field_name = form_parameters.get('name')
elif form_header_field == 'content-type':
file_type = form_header_value
content_type = form_header_value
content_charset = form_parameters.get('charset', 'utf-8')
if field_name:
post_data = form_part[line_index:-4]
if file_name or file_type:
file = File(type=file_type, name=file_name, body=post_data)
if file_name:
form_file = File(type=content_type,
name=file_name,
body=post_data)
if field_name in files:
files[field_name].append(file)
files[field_name].append(form_file)
else:
files[field_name] = [file]
files[field_name] = [form_file]
else:
value = post_data.decode('utf-8')
value = post_data.decode(content_charset)
if field_name in fields:
fields[field_name].append(value)
else:
fields[field_name] = [value]
else:
logger.debug('Form-data field does not have a \'name\' parameter \
in the Content-Disposition header')
return fields, files

View File

@@ -72,6 +72,8 @@ STATUS_CODES = {
511: b'Network Authentication Required'
}
EMPTY_STATUS_CODES = [204, 304]
class BaseHTTPResponse:
def _encode_body(self, data):
@@ -195,8 +197,14 @@ class HTTPResponse(BaseHTTPResponse):
timeout_header = b''
if keep_alive and keep_alive_timeout is not None:
timeout_header = b'Keep-Alive: %d\r\n' % keep_alive_timeout
self.headers['Content-Length'] = self.headers.get(
'Content-Length', len(self.body))
body = b''
content_length = 0
if self.status not in EMPTY_STATUS_CODES:
body = self.body
content_length = self.headers.get('Content-Length', len(self.body))
self.headers['Content-Length'] = content_length
self.headers['Content-Type'] = self.headers.get(
'Content-Type', self.content_type)
@@ -218,7 +226,7 @@ class HTTPResponse(BaseHTTPResponse):
b'keep-alive' if keep_alive else b'close',
timeout_header,
headers,
self.body
body
)
@property

View File

@@ -234,11 +234,11 @@ class Router:
if properties['unhashable']:
routes_to_check = self.routes_always_check
ndx, route = self.check_dynamic_route_exists(
pattern, routes_to_check)
pattern, routes_to_check, parameters)
else:
routes_to_check = self.routes_dynamic[url_hash(uri)]
ndx, route = self.check_dynamic_route_exists(
pattern, routes_to_check)
pattern, routes_to_check, parameters)
if ndx != -1:
# Pop the ndx of the route, no dups of the same route
routes_to_check.pop(ndx)
@@ -285,9 +285,9 @@ class Router:
self.routes_static[uri] = route
@staticmethod
def check_dynamic_route_exists(pattern, routes_to_check):
def check_dynamic_route_exists(pattern, routes_to_check, parameters):
for ndx, route in enumerate(routes_to_check):
if route.pattern == pattern:
if route.pattern == pattern and route.parameters == parameters:
return ndx, route
else:
return -1, None

View File

@@ -5,7 +5,7 @@ from functools import partial
from inspect import isawaitable
from multiprocessing import Process
from signal import (
SIGTERM, SIGINT,
SIGTERM, SIGINT, SIG_IGN,
signal as signal_func,
Signals
)
@@ -20,9 +20,10 @@ from httptools import HttpRequestParser
from httptools.parser.errors import HttpParserError
try:
import uvloop as async_loop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
async_loop = asyncio
pass
from sanic.log import logger, access_logger
from sanic.response import HTTPResponse
@@ -194,7 +195,7 @@ class HttpProtocol(asyncio.Protocol):
self.keep_alive_timeout_callback)
)
else:
logger.info('KeepAlive Timeout. Closing connection.')
logger.debug('KeepAlive Timeout. Closing connection.')
self.transport.close()
self.transport = None
@@ -509,11 +510,11 @@ def serve(host, port, request_handler, error_handler, before_start=None,
request_timeout=60, response_timeout=60, keep_alive_timeout=5,
ssl=None, sock=None, request_max_size=None, reuse_port=False,
loop=None, protocol=HttpProtocol, backlog=100,
register_sys_signals=True, run_async=False, connections=None,
signal=Signal(), request_class=None, access_log=True,
keep_alive=True, is_request_stream=False, router=None,
websocket_max_size=None, websocket_max_queue=None, state=None,
graceful_shutdown_timeout=15.0):
register_sys_signals=True, run_multiple=False, run_async=False,
connections=None, signal=Signal(), request_class=None,
access_log=True, keep_alive=True, is_request_stream=False,
router=None, websocket_max_size=None, websocket_max_queue=None,
state=None, graceful_shutdown_timeout=15.0):
"""Start asynchronous HTTP Server on an individual process.
:param host: Address to host on
@@ -547,7 +548,8 @@ def serve(host, port, request_handler, error_handler, before_start=None,
:return: Nothing
"""
if not run_async:
loop = async_loop.new_event_loop()
# create new event_loop after fork
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if debug:
@@ -603,9 +605,14 @@ def serve(host, port, request_handler, error_handler, before_start=None,
trigger_events(after_start, loop)
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
# Register signals for graceful termination
if register_sys_signals:
for _signal in (SIGINT, SIGTERM):
_singals = (SIGTERM,) if run_multiple else (SIGINT, SIGTERM)
for _signal in _singals:
try:
loop.add_signal_handler(_signal, loop.stop)
except NotImplementedError:
@@ -668,6 +675,7 @@ def serve_multiple(server_settings, workers):
:return:
"""
server_settings['reuse_port'] = True
server_settings['run_multiple'] = True
# Handling when custom socket is not provided.
if server_settings.get('sock') is None:
@@ -682,12 +690,13 @@ def serve_multiple(server_settings, workers):
def sig_handler(signal, frame):
logger.info("Received signal %s. Shutting down.", Signals(signal).name)
for process in processes:
os.kill(process.pid, SIGINT)
os.kill(process.pid, SIGTERM)
signal_func(SIGINT, lambda s, f: sig_handler(s, f))
signal_func(SIGTERM, lambda s, f: sig_handler(s, f))
processes = []
for _ in range(workers):
process = Process(target=serve, kwargs=server_settings)
process.daemon = True

View File

@@ -6,12 +6,18 @@ from websockets import ConnectionClosed # noqa
class WebSocketProtocol(HttpProtocol):
def __init__(self, *args, websocket_max_size=None,
websocket_max_queue=None, **kwargs):
def __init__(self, *args, websocket_timeout=10,
websocket_max_size=None,
websocket_max_queue=None,
websocket_read_limit=2 ** 16,
websocket_write_limit=2 ** 16, **kwargs):
super().__init__(*args, **kwargs)
self.websocket = None
self.websocket_timeout = websocket_timeout
self.websocket_max_size = websocket_max_size
self.websocket_max_queue = websocket_max_queue
self.websocket_read_limit = websocket_read_limit
self.websocket_write_limit = websocket_write_limit
# timeouts make no sense for websocket routes
def request_timeout_callback(self):
@@ -85,8 +91,11 @@ class WebSocketProtocol(HttpProtocol):
# hook up the websocket protocol
self.websocket = WebSocketCommonProtocol(
timeout=self.websocket_timeout,
max_size=self.websocket_max_size,
max_queue=self.websocket_max_queue
max_queue=self.websocket_max_queue,
read_limit=self.websocket_read_limit,
write_limit=self.websocket_write_limit
)
self.websocket.subprotocol = subprotocol
self.websocket.connection_made(request.transport)

View File

@@ -43,7 +43,7 @@ setup_kwargs = {
'packages': ['sanic'],
'platforms': 'any',
'classifiers': [
'Development Status :: 2 - Pre-Alpha',
'Development Status :: 4 - Beta',
'Environment :: Web Environment',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3.5',

View File

@@ -446,3 +446,44 @@ def test_bp_shorthand():
'Sec-WebSocket-Version': '13'})
assert response.status == 101
assert ev.is_set()
def test_bp_group():
app = Sanic('test_nested_bp_groups')
deep_0 = Blueprint('deep_0', url_prefix='/deep')
deep_1 = Blueprint('deep_1', url_prefix = '/deep1')
@deep_0.route('/')
def handler(request):
return text('D0_OK')
@deep_1.route('/bottom')
def handler(request):
return text('D1B_OK')
mid_0 = Blueprint.group(deep_0, deep_1, url_prefix='/mid')
mid_1 = Blueprint('mid_tier', url_prefix='/mid1')
@mid_1.route('/')
def handler(request):
return text('M1_OK')
top = Blueprint.group(mid_0, mid_1)
app.blueprint(top)
@app.route('/')
def handler(request):
return text('TOP_OK')
request, response = app.test_client.get('/')
assert response.text == 'TOP_OK'
request, response = app.test_client.get('/mid1')
assert response.text == 'M1_OK'
request, response = app.test_client.get('/mid/deep')
assert response.text == 'D0_OK'
request, response = app.test_client.get('/mid/deep1/bottom')
assert response.text == 'D1B_OK'

View File

@@ -23,4 +23,3 @@ def test_multiprocessing():
app.run(HOST, app.test_port, workers=num_workers)
assert len(process_list) == num_workers

View File

@@ -104,18 +104,20 @@ def test_json():
assert results.get('test') == True
def test_empty_json():
app = Sanic('test_json')
@app.route('/')
async def handler(request):
assert request.json == None
assert request.json is None
return json(request.json)
request, response = app.test_client.get('/')
assert response.status == 200
assert response.text == 'null'
def test_invalid_json():
app = Sanic('test_json')

View File

@@ -16,7 +16,6 @@ from unittest.mock import MagicMock
JSON_DATA = {'ok': True}
def test_response_body_not_a_string():
"""Test when a response body sent from the application is not a string"""
app = Sanic('response_body_not_a_string')
@@ -35,6 +34,7 @@ async def sample_streaming_fn(response):
await asyncio.sleep(.001)
response.write('bar')
def test_method_not_allowed():
app = Sanic('method_not_allowed')
@@ -43,7 +43,7 @@ def test_method_not_allowed():
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
assert response.headers['Allow']== 'GET'
assert response.headers['Allow'] == 'GET'
@app.post('/')
async def test(request):
@@ -63,6 +63,22 @@ def json_app():
async def test(request):
return json(JSON_DATA)
@app.get("/no-content")
async def no_content_handler(request):
return json(JSON_DATA, status=204)
@app.get("/no-content/unmodified")
async def no_content_unmodified_handler(request):
return json(None, status=304)
@app.get("/unmodified")
async def unmodified_handler(request):
return json(JSON_DATA, status=304)
@app.delete("/")
async def delete_handler(request):
return json(None, status=204)
return app
@@ -73,6 +89,29 @@ def test_json_response(json_app):
assert response.text == json_dumps(JSON_DATA)
assert response.json == JSON_DATA
def test_no_content(json_app):
request, response = json_app.test_client.get('/no-content')
assert response.status == 204
assert response.text == ''
assert response.headers['Content-Length'] == '0'
request, response = json_app.test_client.get('/no-content/unmodified')
assert response.status == 304
assert response.text == ''
assert response.headers['Content-Length'] == '0'
request, response = json_app.test_client.get('/unmodified')
assert response.status == 304
assert response.text == ''
assert response.headers['Content-Length'] == '0'
request, response = json_app.test_client.delete('/')
assert response.status == 204
assert response.text == ''
assert response.headers['Content-Length'] == '0'
@pytest.fixture
def streaming_app():
app = Sanic('streaming')
@@ -156,9 +195,11 @@ def get_file_content(static_file_directory, file_name):
with open(os.path.join(static_file_directory, file_name), 'rb') as file:
return file.read()
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
def test_file_response(file_name, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -170,10 +211,12 @@ def test_file_response(file_name, static_file_directory):
assert response.body == get_file_content(static_file_directory, file_name)
assert 'Content-Disposition' not in response.headers
@pytest.mark.parametrize('source,dest', [
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'), ('python.png', 'logo.png')])
def test_file_response_custom_filename(source, dest, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -185,9 +228,11 @@ def test_file_response_custom_filename(source, dest, static_file_directory):
assert response.body == get_file_content(static_file_directory, source)
assert response.headers['Content-Disposition'] == 'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_file_head_response(file_name, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET', 'HEAD'])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -212,9 +257,11 @@ def test_file_head_response(file_name, static_file_directory):
'Content-Length']) == len(
get_file_content(static_file_directory, file_name))
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
def test_file_stream_response(file_name, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -227,10 +274,12 @@ def test_file_stream_response(file_name, static_file_directory):
assert response.body == get_file_content(static_file_directory, file_name)
assert 'Content-Disposition' not in response.headers
@pytest.mark.parametrize('source,dest', [
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'), ('python.png', 'logo.png')])
def test_file_stream_response_custom_filename(source, dest, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
@@ -242,9 +291,11 @@ def test_file_stream_response_custom_filename(source, dest, static_file_director
assert response.body == get_file_content(static_file_directory, source)
assert response.headers['Content-Disposition'] == 'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
def test_file_stream_head_response(file_name, static_file_directory):
app = Sanic('test_file_helper')
@app.route('/files/<filename>', methods=['GET', 'HEAD'])
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)

View File

@@ -2,7 +2,7 @@ import asyncio
import pytest
from sanic import Sanic
from sanic.response import text
from sanic.response import text, json
from sanic.router import RouteExists, RouteDoesNotExist
from sanic.constants import HTTP_METHODS
@@ -907,3 +907,27 @@ def test_unicode_routes():
request, response = app.test_client.get('/overload/你好')
assert response.text == 'OK2 你好'
def test_uri_with_different_method_and_different_params():
app = Sanic('test_uri')
@app.route('/ads/<ad_id>', methods=['GET'])
async def ad_get(request, ad_id):
return json({'ad_id': ad_id})
@app.route('/ads/<action>', methods=['POST'])
async def ad_post(request, action):
return json({'action': action})
request, response = app.test_client.get('/ads/1234')
assert response.status == 200
assert response.json == {
'ad_id': '1234'
}
request, response = app.test_client.post('/ads/post')
assert response.status == 200
assert response.json == {
'action': 'post'
}

View File

@@ -75,7 +75,7 @@ def test_fails_if_endpoint_not_found():
app = Sanic('fail_url_build')
@app.route('/fail')
def fail():
def fail(request):
return text('this should fail')
with pytest.raises(URLBuildError) as e:
@@ -93,7 +93,7 @@ def test_fails_url_build_if_param_not_passed():
app = Sanic('fail_url_build')
@app.route(url)
def fail():
def fail(request):
return text('this should fail')
fail_args = list(string.ascii_letters)
@@ -111,7 +111,7 @@ def test_fails_url_build_if_params_not_passed():
app = Sanic('fail_url_build')
@app.route('/fail')
def fail():
def fail(request):
return text('this should fail')
with pytest.raises(ValueError) as e:
@@ -134,7 +134,7 @@ def test_fails_with_int_message():
app = Sanic('fail_url_build')
@app.route(COMPLEX_PARAM_URL)
def fail():
def fail(request):
return text('this should fail')
failing_kwargs = dict(PASSING_KWARGS)
@@ -153,7 +153,7 @@ def test_fails_with_two_letter_string_message():
app = Sanic('fail_url_build')
@app.route(COMPLEX_PARAM_URL)
def fail():
def fail(request):
return text('this should fail')
failing_kwargs = dict(PASSING_KWARGS)
@@ -173,7 +173,7 @@ def test_fails_with_number_message():
app = Sanic('fail_url_build')
@app.route(COMPLEX_PARAM_URL)
def fail():
def fail(request):
return text('this should fail')
failing_kwargs = dict(PASSING_KWARGS)
@@ -193,7 +193,7 @@ def test_adds_other_supplied_values_as_query_string():
app = Sanic('passes')
@app.route(COMPLEX_PARAM_URL)
def passes():
def passes(request):
return text('this should pass')
new_kwargs = dict(PASSING_KWARGS)
@@ -216,7 +216,7 @@ def blueprint_app():
second_print = Blueprint('second', url_prefix='/second')
@first_print.route('/foo')
def foo():
def foo(request):
return text('foo from first')
@first_print.route('/foo/<param>')
@@ -225,7 +225,7 @@ def blueprint_app():
'foo from first : {}'.format(param))
@second_print.route('/foo') # noqa
def foo():
def foo(request):
return text('foo from second')
@second_print.route('/foo/<param>') # noqa

View File

@@ -8,7 +8,7 @@ setenv =
{py35,py36}-no-ext: SANIC_NO_UVLOOP=1
deps =
coverage
pytest
pytest==3.3.2
pytest-cov
pytest-sanic
pytest-sugar