Merge pull request #32 from huge-success/master

merge upstream master branch
This commit is contained in:
7 2018-11-10 21:26:37 +08:00 committed by GitHub
commit 92cd10c6a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 639 additions and 207 deletions

0
docs/_static/.gitkeep vendored Normal file
View File

View File

@ -21,8 +21,11 @@ Guides
sanic/streaming
sanic/class_based_views
sanic/custom_protocol
sanic/sockets
sanic/ssl
sanic/logging
sanic/versioning
sanic/debug_mode
sanic/testing
sanic/deploying
sanic/extensions

View File

@ -48,7 +48,7 @@ by that blueprint. In this example, the registered routes in the `app.router`
will look like:
```python
[Route(handler=<function bp_root at 0x7f908382f9d8>, methods=None, pattern=re.compile('^/$'), parameters=[])]
[Route(handler=<function bp_root at 0x7f908382f9d8>, methods=frozenset({'GET'}), pattern=re.compile('^/$'), parameters=[], name='my_blueprint.bp_root', uri='/')]
```
## Blueprint groups and nesting
@ -87,7 +87,7 @@ from sanic import Blueprint
from .static import static
from .authors import authors
content = Blueprint.group(assets, authors, url_prefix='/content')
content = Blueprint.group(static, authors, url_prefix='/content')
```
```python
# api/info.py
@ -254,5 +254,3 @@ async def root(request):
async def post_handler(request, post_id):
return text('Post {} in Blueprint V1'.format(post_id))
```

View File

@ -29,8 +29,8 @@ See it's that simple!
## Pull requests!
So the pull request approval rules are pretty simple:
1. All pull requests must pass unit tests
* All pull requests must be reviewed and approved by at least
* All pull requests must pass unit tests
* All pull requests must be reviewed and approved by at least
one current collaborator on the project
* All pull requests must pass flake8 checks
* If you decide to remove/change anything from any common interface

View File

@ -4,8 +4,13 @@ Make sure you have both [pip](https://pip.pypa.io/en/stable/installing/) and at
least version 3.5 of Python before starting. Sanic uses the new `async`/`await`
syntax, so earlier versions of python won't work.
1. Install Sanic: `python3 -m pip install sanic`
2. Create a file called `main.py` with the following code:
## 1. Install Sanic
```
python3 -m pip install sanic
```
## 2. Create a file called `main.py`
```python
from sanic import Sanic
@ -20,9 +25,16 @@ syntax, so earlier versions of python won't work.
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
```
3. Run the server: `python3 main.py`
4. Open the address `http://0.0.0.0:8000` in your web browser. You should see
the message *Hello world!*.
## 3. Run the server
```
python3 main.py
```
## 4. Check your browser
Open the address `http://0.0.0.0:8000` in your web browser. You should see
the message *Hello world!*.
You now have a working Sanic server!

View File

@ -17,7 +17,7 @@ string representing its type: `'request'` or `'response'`.
The simplest middleware doesn't modify the request or response at all:
```python
```
@app.middleware('request')
async def print_on_request(request):
print("I print when a request is received by the server")
@ -33,7 +33,7 @@ Middleware can modify the request or response parameter it is given, *as long
as it does not return it*. The following example shows a practical use-case for
this.
```python
```
app = Sanic(__name__)
@app.middleware('response')
@ -60,7 +60,7 @@ and the response will be returned. If this occurs to a request before the
relevant user route handler is reached, the handler will never be called.
Returning a response will also prevent any further middleware from running.
```python
```
@app.middleware('request')
async def halt_request(request):
return text('I halted the request')
@ -79,11 +79,11 @@ If you want to execute startup/teardown code as your server starts or closes, yo
- `before_server_stop`
- `after_server_stop`
These listeners are implemented as decorators on functions which accept the app object as well as the asyncio loop.
These listeners are implemented as decorators on functions which accept the app object as well as the asyncio loop.
For example:
```python
```
@app.listener('before_server_start')
async def setup_db(app, loop):
app.db = await db_setup()
@ -101,16 +101,16 @@ async def close_db(app, loop):
await app.db.close()
```
It's also possible to register a listener using the `register_listener` method.
It's also possible to register a listener using the `register_listener` method.
This may be useful if you define your listeners in another module besides
the one you instantiate your app in.
```python
```
app = Sanic()
async def setup_db(app, loop):
app.db = await db_setup()
app.register_listener(setup_db, 'before_server_start')
```
@ -118,7 +118,7 @@ app.register_listener(setup_db, 'before_server_start')
If you want to schedule a background task to run after the loop has started,
Sanic provides the `add_task` method to easily do so.
```python
```
async def notify_server_started_after_five_seconds():
await asyncio.sleep(5)
print('Server successfully started!')
@ -128,7 +128,7 @@ app.add_task(notify_server_started_after_five_seconds())
Sanic will attempt to automatically inject the app, passing it as an argument to the task:
```python
```
async def notify_server_started_after_five_seconds(app):
await asyncio.sleep(5)
print(app.name)
@ -138,7 +138,7 @@ app.add_task(notify_server_started_after_five_seconds)
Or you can pass the app explicitly for the same effect:
```python
```
async def notify_server_started_after_five_seconds(app):
await asyncio.sleep(5)
print(app.name)

66
docs/sanic/sockets.rst Normal file
View File

@ -0,0 +1,66 @@
Sockets
=======
Sanic can use the python
`socket module <https://docs.python.org/3/library/socket.html>`_ to accommodate
non IPv4 sockets.
IPv6 example:
.. code:: python
from sanic import Sanic
from sanic.response import json
import socket
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(('::', 7777))
app = Sanic()
@app.route("/")
async def test(request):
return json({"hello": "world"})
if __name__ == "__main__":
app.run(sock=sock)
to test IPv6 ``curl -g -6 "http://[::1]:7777/"``
UNIX socket example:
.. code:: python
import signal
import sys
import socket
import os
from sanic import Sanic
from sanic.response import json
server_socket = '/tmp/sanic.sock'
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(server_socket)
app = Sanic()
@app.route("/")
async def test(request):
return json({"hello": "world"})
def signal_handler(sig, frame):
print('Exiting')
os.unlink(server_socket)
sys.exit(0)
if __name__ == "__main__":
app.run(sock=sock)
to test UNIX: ``curl -v --unix-socket /tmp/sanic.sock http://localhost/hello``

View File

@ -43,6 +43,7 @@ and ``recv`` methods to send and receive data respectively.
You could setup your own WebSocket configuration through ``app.config``, like
.. code:: python
app.config.WEBSOCKET_MAX_SIZE = 2 ** 20
app.config.WEBSOCKET_MAX_QUEUE = 32
app.config.WEBSOCKET_READ_LIMIT = 2 ** 16

View File

@ -17,5 +17,5 @@ dependencies:
- aiofiles>=0.3.0
- websockets>=6.0
- sphinxcontrib-asyncio>=0.2.0
- multidict>=4.0<5.0
- multidict>=4.0,<5.0
- https://github.com/channelcat/docutils-fork/zipball/master

View File

@ -1,6 +1,7 @@
from sanic.app import Sanic
from sanic.blueprints import Blueprint
__version__ = "0.8.3"
__all__ = ["Sanic", "Blueprint"]

View File

@ -1,8 +1,9 @@
from argparse import ArgumentParser
from importlib import import_module
from sanic.log import logger
from sanic.app import Sanic
from sanic.log import logger
if __name__ == "__main__":
parser = ArgumentParser(prog="sanic")
@ -51,5 +52,5 @@ if __name__ == "__main__":
" Example File: project/sanic_server.py -> app\n"
" Example Module: project.sanic_server.app".format(e.name)
)
except ValueError as e:
except ValueError:
logger.exception("Failed to run app")

View File

@ -1,29 +1,30 @@
import os
import logging
import logging.config
import os
import re
import warnings
from asyncio import get_event_loop, ensure_future, CancelledError
from collections import deque, defaultdict
from asyncio import CancelledError, ensure_future, get_event_loop
from collections import defaultdict, deque
from functools import partial
from inspect import getmodulename, isawaitable, signature, stack
from ssl import Purpose, create_default_context
from traceback import format_exc
from urllib.parse import urlencode, urlunparse
from ssl import create_default_context, Purpose
from sanic import reloader_helpers
from sanic.config import Config
from sanic.constants import HTTP_METHODS
from sanic.exceptions import ServerError, URLBuildError, SanicException
from sanic.exceptions import SanicException, ServerError, URLBuildError
from sanic.handlers import ErrorHandler
from sanic.log import logger, error_logger, LOGGING_CONFIG_DEFAULTS
from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger
from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.router import Router
from sanic.server import serve, serve_multiple, HttpProtocol, Signal
from sanic.server import HttpProtocol, Signal, serve, serve_multiple
from sanic.static import register as static_register
from sanic.testing import SanicTestClient
from sanic.views import CompositionView
from sanic.websocket import WebSocketProtocol, ConnectionClosed
import sanic.reloader_helpers as reloader_helpers
from sanic.websocket import ConnectionClosed, WebSocketProtocol
class Sanic:
@ -370,8 +371,7 @@ class Sanic:
):
"""Decorate a function to be registered as a websocket route
:param uri: path of the URL
:param subprotocols: optional list of strings with the supported
subprotocols
:param subprotocols: optional list of str with supported subprotocols
:param host:
:return: decorated function
"""
@ -567,7 +567,7 @@ class Sanic:
return self.blueprint(*args, **kwargs)
def url_for(self, view_name: str, **kwargs):
"""Build a URL based on a view name and the values provided.
r"""Build a URL based on a view name and the values provided.
In order to build a URL, all request parameters must be supplied as
keyword arguments, and each parameter must pass the test for the
@ -578,7 +578,7 @@ class Sanic:
the output URL's query string.
:param view_name: string referencing the view name
:param \*\*kwargs: keys and values that are used to build request
:param \**kwargs: keys and values that are used to build request
parameters and query string arguments.
:return: the built URL
@ -835,6 +835,14 @@ class Sanic:
access_log=True,
**kwargs
):
if "loop" in kwargs:
raise TypeError(
"loop is not a valid argument. To use an existing loop, "
"change to create_server().\nSee more: "
"https://sanic.readthedocs.io/en/latest/sanic/deploying.html"
"#asynchronous-support"
)
"""Run the HTTP Server and listen until keyboard interrupt or term
signal. On termination, drain connections before closing.

View File

@ -3,8 +3,9 @@ from collections import defaultdict, namedtuple
from sanic.constants import HTTP_METHODS
from sanic.views import CompositionView
FutureRoute = namedtuple(
"Route",
"FutureRoute",
[
"handler",
"uri",
@ -16,11 +17,15 @@ FutureRoute = namedtuple(
"name",
],
)
FutureListener = namedtuple("Listener", ["handler", "uri", "methods", "host"])
FutureMiddleware = namedtuple("Route", ["middleware", "args", "kwargs"])
FutureException = namedtuple("Route", ["handler", "args", "kwargs"])
FutureListener = namedtuple(
"FutureListener", ["handler", "uri", "methods", "host"]
)
FutureMiddleware = namedtuple(
"FutureMiddleware", ["middleware", "args", "kwargs"]
)
FutureException = namedtuple("FutureException", ["handler", "args", "kwargs"])
FutureStatic = namedtuple(
"Route", ["uri", "file_or_directory", "args", "kwargs"]
"FutureStatic", ["uri", "file_or_directory", "args", "kwargs"]
)

View File

@ -1,6 +1,7 @@
import re
import string
# ------------------------------------------------------------ #
# SimpleCookie
# ------------------------------------------------------------ #
@ -17,7 +18,7 @@ _Translator.update({ord('"'): '\\"', ord("\\"): "\\\\"})
def _quote(str):
"""Quote a string for use in a cookie header.
r"""Quote a string for use in a cookie header.
If the string does not need to be double-quoted, then just return the
string. Otherwise, surround the string in doublequotes and quote
(with a \) special characters.

View File

@ -1,5 +1,6 @@
from sanic.helpers import STATUS_CODES
TRACEBACK_STYLE = """
<style>
body {

View File

@ -1,20 +1,21 @@
import sys
from traceback import extract_tb, format_exc
from sanic.exceptions import (
ContentRangeError,
HeaderNotFound,
INTERNAL_SERVER_ERROR_HTML,
InvalidRangeType,
SanicException,
TRACEBACK_BORDER,
TRACEBACK_LINE_HTML,
TRACEBACK_STYLE,
TRACEBACK_WRAPPER_HTML,
TRACEBACK_WRAPPER_INNER_HTML,
TRACEBACK_BORDER,
ContentRangeError,
HeaderNotFound,
InvalidRangeType,
SanicException,
)
from sanic.log import logger
from sanic.response import text, html
from sanic.response import html, text
class ErrorHandler:
@ -166,17 +167,17 @@ class ContentRangeHandler:
)
else:
# this case represents `Content-Range: bytes 5-`
self.end = self.total
self.end = self.total - 1
else:
if self.start is None:
# this case represents `Content-Range: bytes -5`
self.start = self.total - self.end
self.end = self.total
self.end = self.total - 1
if self.start >= self.end:
raise ContentRangeError(
"Invalid for Content Range parameters", self
)
self.size = self.end - self.start
self.size = self.end - self.start + 1
self.headers = {
"Content-Range": "bytes %s-%s/%s"
% (self.start, self.end, self.total)

View File

@ -6,7 +6,7 @@ LOGGING_CONFIG_DEFAULTS = dict(
version=1,
disable_existing_loggers=False,
loggers={
"root": {"level": "INFO", "handlers": ["console"]},
"sanic.root": {"level": "INFO", "handlers": ["console"]},
"sanic.error": {
"level": "INFO",
"handlers": ["error_console"],

View File

@ -1,9 +1,10 @@
import os
import sys
import signal
import subprocess
from time import sleep
import sys
from multiprocessing import Process
from time import sleep
def _iter_module_files():

View File

@ -1,11 +1,17 @@
import sys
import json
import sys
from cgi import parse_header
from collections import namedtuple
from http.cookies import SimpleCookie
from httptools import parse_url
from urllib.parse import parse_qs, urlunparse
from httptools import parse_url
from sanic.exceptions import InvalidUsage
from sanic.log import error_logger, logger
try:
from ujson import loads as json_loads
except ImportError:
@ -18,8 +24,6 @@ except ImportError:
else:
json_loads = json.loads
from sanic.exceptions import InvalidUsage
from sanic.log import error_logger, logger
DEFAULT_HTTP_CONTENT_TYPE = "application/octet-stream"

View File

@ -1,17 +1,23 @@
from functools import partial
from mimetypes import guess_type
from os import path
from urllib.parse import quote_plus
try:
from ujson import dumps as json_dumps
except BaseException:
from json import dumps as json_dumps
from aiofiles import open as open_async
from multidict import CIMultiDict
from sanic.helpers import STATUS_CODES, has_message_body, remove_entity_headers
from sanic.cookies import CookieJar
from sanic.helpers import STATUS_CODES, has_message_body, remove_entity_headers
try:
from ujson import dumps as json_dumps
except BaseException:
from json import dumps
# This is done in order to ensure that the JSON response is
# kept consistent across both ujson and inbuilt json usage.
json_dumps = partial(dumps, separators=(",", ":"))
class BaseHTTPResponse:
@ -301,6 +307,7 @@ async def file(
_range.end,
_range.total,
)
status = 206
else:
out_stream = await _file.read()
@ -370,6 +377,7 @@ async def file_stream(
_range.end,
_range.total,
)
status = 206
return StreamingHTTPResponse(
streaming_fn=_streaming_fn,
status=status,
@ -421,7 +429,7 @@ def redirect(
headers = headers or {}
# URL Quote the URL before redirecting
safe_to = quote_plus(to, safe=":/#?&=@[]!$&'()*+,;")
safe_to = quote_plus(to, safe=":/%#?&=@[]!$&'()*+,;")
# According to RFC 7231, a relative URI is now permitted.
headers["Location"] = safe_to

View File

@ -1,13 +1,15 @@
import re
import uuid
from collections import defaultdict, namedtuple
from collections.abc import Iterable
from functools import lru_cache
from urllib.parse import unquote
from sanic.exceptions import NotFound, MethodNotSupported
from sanic.exceptions import MethodNotSupported, NotFound
from sanic.views import CompositionView
Route = namedtuple(
"Route", ["handler", "methods", "pattern", "parameters", "name", "uri"]
)

View File

@ -1,17 +1,31 @@
import asyncio
import os
import traceback
from functools import partial
from inspect import isawaitable
from multiprocessing import Process
from signal import SIGTERM, SIGINT, SIG_IGN, signal as signal_func, Signals
from socket import socket, SOL_SOCKET, SO_REUSEADDR
from signal import SIG_IGN, SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from socket import SO_REUSEADDR, SOL_SOCKET, socket
from time import time
from httptools import HttpRequestParser
from httptools.parser.errors import HttpParserError
from multidict import CIMultiDict
from sanic.exceptions import (
InvalidUsage,
PayloadTooLarge,
RequestTimeout,
ServerError,
ServiceUnavailable,
)
from sanic.log import access_logger, logger
from sanic.request import Request
from sanic.response import HTTPResponse
try:
import uvloop
@ -19,16 +33,6 @@ try:
except ImportError:
pass
from sanic.log import logger, access_logger
from sanic.response import HTTPResponse
from sanic.request import Request
from sanic.exceptions import (
RequestTimeout,
PayloadTooLarge,
InvalidUsage,
ServerError,
ServiceUnavailable,
)
current_time = None

View File

@ -1,7 +1,7 @@
from mimetypes import guess_type
from os import path
from re import sub
from time import strftime, gmtime
from time import gmtime, strftime
from urllib.parse import unquote
from aiofiles.os import stat
@ -13,7 +13,7 @@ from sanic.exceptions import (
InvalidUsage,
)
from sanic.handlers import ContentRangeHandler
from sanic.response import file, file_stream, HTTPResponse
from sanic.response import HTTPResponse, file, file_stream
def register(

View File

@ -1,6 +1,7 @@
from json import JSONDecodeError
from sanic.log import logger
from sanic.exceptions import MethodNotSupported
from sanic.log import logger
from sanic.response import text
@ -33,7 +34,7 @@ class SanicTestClient:
) as response:
try:
response.text = await response.text()
except UnicodeDecodeError as e:
except UnicodeDecodeError:
response.text = None
try:

View File

@ -1,5 +1,5 @@
from sanic.exceptions import InvalidUsage
from sanic.constants import HTTP_METHODS
from sanic.exceptions import InvalidUsage
class HTTPMethodView:

View File

@ -1,8 +1,9 @@
from httptools import HttpParserUpgrade
from websockets import ConnectionClosed # noqa
from websockets import InvalidHandshake, WebSocketCommonProtocol, handshake
from sanic.exceptions import InvalidUsage
from sanic.server import HttpProtocol
from httptools import HttpParserUpgrade
from websockets import handshake, WebSocketCommonProtocol, InvalidHandshake
from websockets import ConnectionClosed # noqa
class WebSocketProtocol(HttpProtocol):

View File

@ -1,10 +1,16 @@
import os
import sys
import signal
import asyncio
import logging
import os
import signal
import sys
import traceback
import gunicorn.workers.base as base
from sanic.server import HttpProtocol, Signal, serve, trigger_events
from sanic.websocket import WebSocketProtocol
try:
import ssl
except ImportError:
@ -16,10 +22,6 @@ try:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
import gunicorn.workers.base as base
from sanic.server import trigger_events, serve, HttpProtocol, Signal
from sanic.websocket import WebSocketProtocol
class GunicornWorker(base.Worker):

View File

@ -2,3 +2,16 @@
# https://github.com/ambv/black#slices
# https://github.com/ambv/black#line-breaks--binary-operators
ignore = E203, W503
[isort]
atomic=true
default_section = THIRDPARTY
include_trailing_comma = true
known_first_party = sanic
known_third_party = pytest
line_length = 79
lines_after_imports = 2
lines_between_types = 1
multi_line_output = 3
not_skip = __init__.py

View File

@ -3,6 +3,7 @@ import asyncio
def test_bad_request_response(app):
lines = []
@app.listener('after_server_start')
async def _request(sanic, loop):
connect = asyncio.open_connection('127.0.0.1', 42101)

View File

@ -48,6 +48,20 @@ def test_load_env_prefix():
del environ["MYAPP_TEST_ANSWER"]
def test_load_env_prefix_float_values():
environ["MYAPP_TEST_ROI"] = "2.3"
app = Sanic(load_env="MYAPP_")
assert app.config.TEST_ROI == 2.3
del environ["MYAPP_TEST_ROI"]
def test_load_env_prefix_string_value():
environ["MYAPP_TEST_TOKEN"] = "somerandomtesttoken"
app = Sanic(load_env="MYAPP_")
assert app.config.TEST_TOKEN == "somerandomtesttoken"
del environ["MYAPP_TEST_TOKEN"]
def test_load_from_file(app):
config = dedent("""
VALUE = 'some value'

View File

@ -1,9 +1,8 @@
from datetime import datetime, timedelta
from http.cookies import SimpleCookie
from sanic import Sanic
from sanic.response import json, text
from sanic.response import text
import pytest
from sanic.cookies import Cookie
# ------------------------------------------------------------ #
# GET
@ -62,6 +61,7 @@ def test_false_cookies(app, httponly, expected):
assert ('HttpOnly' in response_cookies['right_back'].output()) == expected
def test_http2_cookies(app):
@app.route('/')
@ -74,6 +74,7 @@ def test_http2_cookies(app):
assert response.text == 'Cookies are: working!'
def test_cookie_options(app):
@app.route('/')
@ -81,7 +82,8 @@ def test_cookie_options(app):
response = text("OK")
response.cookies['test'] = 'at you'
response.cookies['test']['httponly'] = True
response.cookies['test']['expires'] = datetime.now() + timedelta(seconds=10)
response.cookies['test']['expires'] = (datetime.now() +
timedelta(seconds=10))
return response
request, response = app.test_client.get('/')
@ -89,7 +91,8 @@ def test_cookie_options(app):
response_cookies.load(response.headers.get('Set-Cookie', {}))
assert response_cookies['test'].value == 'at you'
assert response_cookies['test']['httponly'] == True
assert response_cookies['test']['httponly'] is True
def test_cookie_deletion(app):
@ -107,4 +110,23 @@ def test_cookie_deletion(app):
assert int(response_cookies['i_want_to_die']['max-age']) == 0
with pytest.raises(KeyError):
hold_my_beer = response.cookies['i_never_existed']
response.cookies['i_never_existed']
def test_cookie_reserved_cookie():
with pytest.raises(expected_exception=KeyError) as e:
Cookie("domain", "testdomain.com")
assert e.message == "Cookie name is a reserved word"
def test_cookie_illegal_key_format():
with pytest.raises(expected_exception=KeyError) as e:
Cookie("testå", "test")
assert e.message == "Cookie key contains illegal characters"
def test_cookie_set_unknown_property():
c = Cookie("test_cookie", "value")
with pytest.raises(expected_exception=KeyError) as e:
c["invalid"] = "value"
assert e.message == "Unknown cookie property"

View File

@ -28,6 +28,7 @@ def test_create_task(app):
request, response = app.test_client.get('/late')
assert response.body == b'True'
def test_create_task_with_app_arg(app):
q = Queue()

View File

@ -1,4 +1,3 @@
from sanic import Sanic
from sanic.response import text
from sanic.router import RouteExists
import pytest

View File

@ -131,7 +131,7 @@ def test_exception_handler_lookup():
try:
ModuleNotFoundError
except:
except Exception:
class ModuleNotFoundError(ImportError):
pass

14
tests/test_helpers.py Normal file
View File

@ -0,0 +1,14 @@
from sanic.helpers import has_message_body
def test_has_message_body():
tests = (
(100, False),
(102, False),
(204, False),
(200, True),
(304, False),
(400, True),
)
for status_code, expected in tests:
assert has_message_body(status_code) is expected

View File

@ -140,10 +140,11 @@ class ReuseableSanicTestClient(SanicTestClient):
if self._tcp_connector:
conn = self._tcp_connector
else:
conn = ReuseableTCPConnector(verify_ssl=False,
loop=self._loop,
keepalive_timeout=
request_keepalive)
conn = ReuseableTCPConnector(
verify_ssl=False,
loop=self._loop,
keepalive_timeout=request_keepalive
)
self._tcp_connector = conn
session = aiohttp.ClientSession(cookies=cookies,
connector=conn,

View File

@ -49,7 +49,7 @@ def test_logging_defaults():
reset_logging()
app = Sanic("test_logging")
for fmt in [h.formatter for h in logging.getLogger('root').handlers]:
for fmt in [h.formatter for h in logging.getLogger('sanic.root').handlers]:
assert fmt._fmt == LOGGING_CONFIG_DEFAULTS['formatters']['generic']['format']
for fmt in [h.formatter for h in logging.getLogger('sanic.error').handlers]:
@ -68,7 +68,7 @@ def test_logging_pass_customer_logconfig():
app = Sanic("test_logging", log_config=modified_config)
for fmt in [h.formatter for h in logging.getLogger('root').handlers]:
for fmt in [h.formatter for h in logging.getLogger('sanic.root').handlers]:
assert fmt._fmt == modified_config['formatters']['generic']['format']
for fmt in [h.formatter for h in logging.getLogger('sanic.error').handlers]:
@ -82,7 +82,7 @@ def test_logging_pass_customer_logconfig():
def test_log_connection_lost(app, debug, monkeypatch):
""" Should not log Connection lost exception on non debug """
stream = StringIO()
root = logging.getLogger('root')
root = logging.getLogger('sanic.root')
root.addHandler(logging.StreamHandler(stream))
monkeypatch.setattr(sanic.server, 'logger', root)
@ -102,3 +102,15 @@ def test_log_connection_lost(app, debug, monkeypatch):
assert 'Connection lost before response written @' in log
else:
assert 'Connection lost before response written @' not in log
def test_logging_modified_root_logger_config():
reset_logging()
modified_config = LOGGING_CONFIG_DEFAULTS
modified_config['loggers']['sanic.root']['level'] = 'DEBUG'
app = Sanic("test_logging", log_config=modified_config)
assert logging.getLogger('sanic.root').getEffectiveLevel() == logging.DEBUG

View File

@ -11,11 +11,11 @@ def test_middleware_request(app):
results = []
@app.middleware
async def handler(request):
async def handler1(request):
results.append(request)
@app.route('/')
async def handler(request):
async def handler2(request):
return text('OK')
request, response = app.test_client.get('/')
@ -28,7 +28,7 @@ def test_middleware_response(app):
results = []
@app.middleware('request')
async def process_response(request):
async def process_request(request):
results.append(request)
@app.middleware('response')
@ -68,6 +68,7 @@ def test_middleware_response_exception(app):
assert response.text == 'OK'
assert result['status_code'] == 404
def test_middleware_override_request(app):
@app.middleware
@ -134,4 +135,4 @@ def test_middleware_order(app):
request, response = app.test_client.get('/')
assert response.status == 200
assert order == [1,2,3,4,5,6]
assert order == [1, 2, 3, 4, 5, 6]

View File

@ -1,9 +1,11 @@
import multiprocessing
import random
import signal
import pickle
import pytest
from sanic.testing import HOST, PORT
from sanic.response import text
@pytest.mark.skipif(
@ -27,3 +29,54 @@ def test_multiprocessing(app):
app.run(HOST, PORT, workers=num_workers)
assert len(process_list) == num_workers
def test_multiprocessing_with_blueprint(app):
from sanic import Blueprint
# Selects a number at random so we can spot check
num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1))
process_list = set()
def stop_on_alarm(*args):
for process in multiprocessing.active_children():
process_list.add(process.pid)
process.terminate()
signal.signal(signal.SIGALRM, stop_on_alarm)
signal.alarm(3)
bp = Blueprint('test_text')
app.blueprint(bp)
app.run(HOST, PORT, workers=num_workers)
assert len(process_list) == num_workers
# this function must be outside a test function so that it can be
# able to be pickled (local functions cannot be pickled).
def handler(request):
return text('Hello')
# Muliprocessing on Windows requires app to be able to be pickled
@pytest.mark.parametrize('protocol', [3, 4])
def test_pickle_app(app, protocol):
app.route('/')(handler)
p_app = pickle.dumps(app, protocol=protocol)
up_p_app = pickle.loads(p_app)
assert up_p_app
request, response = app.test_client.get('/')
assert response.text == 'Hello'
@pytest.mark.parametrize('protocol', [3, 4])
def test_pickle_app_with_bp(app, protocol):
from sanic import Blueprint
bp = Blueprint('test_text')
bp.route('/')(handler)
app.blueprint(bp)
p_app = pickle.dumps(app, protocol=protocol)
up_p_app = pickle.loads(p_app)
assert up_p_app
request, response = app.test_client.get('/')
assert app.is_request_stream is False
assert response.text == 'Hello'

View File

@ -336,7 +336,7 @@ def test_overload_routes(app):
return text('OK1')
@app.route('/overload', methods=['POST', 'PUT'], name='route_second')
async def handler1(request):
async def handler2(request):
return text('OK2')
request, response = app.test_client.get(app.url_for('route_first'))

View File

@ -1,4 +1,5 @@
import pytest
from urllib.parse import quote
from sanic.response import text, redirect
@ -19,15 +20,15 @@ def redirect_app(app):
return text('OK')
@app.route('/1')
def handler(request):
def handler1(request):
return redirect('/2')
@app.route('/2')
def handler(request):
def handler2(request):
return redirect('/3')
@app.route('/3')
def handler(request):
def handler3(request):
return text('OK')
@app.route('/redirect_with_header_injection')
@ -107,3 +108,25 @@ def test_redirect_with_header_injection(redirect_app):
assert response.status == 302
assert "test-header" not in response.headers
assert not response.text.startswith('test-body')
@pytest.mark.parametrize("test_str", ["sanic-test", "sanictest", "sanic test"])
async def test_redirect_with_params(app, test_client, test_str):
@app.route("/api/v1/test/<test>/")
async def init_handler(request, test):
assert test == test_str
return redirect("/api/v2/test/{}/".format(quote(test)))
@app.route("/api/v2/test/<test>/")
async def target_handler(request, test):
assert test == test_str
return text("OK")
test_cli = await test_client(app)
response = await test_cli.get("/api/v1/test/{}/".format(quote(test_str)))
assert response.status == 200
txt = await response.text()
assert txt == "OK"

View File

@ -1,4 +1,3 @@
import pytest
import asyncio
import contextlib
@ -34,7 +33,7 @@ async def test_request_cancel_when_connection_lost(loop, app, test_client):
assert app.still_serving_cancelled_request is False
async def test_stream_request_cancel_when_connection_lost(loop, app, test_client):
async def test_stream_request_cancel_when_conn_lost(loop, app, test_client):
app.still_serving_cancelled_request = False
@app.post('/post/<id>', stream=True)

View File

@ -18,7 +18,10 @@ def test_storage(app):
@app.route('/')
def handler(request):
return json({'user': request.get('user'), 'sidekick': request.get('sidekick')})
return json({
'user': request.get('user'),
'sidekick': request.get('sidekick')
})
request, response = app.test_client.get('/')

View File

@ -181,7 +181,8 @@ def test_request_stream_handle_exception(app):
# 405
request, response = app.test_client.get('/post/random_id', data=data)
assert response.status == 405
assert response.text == 'Error: Method GET not allowed for URL /post/random_id'
assert response.text == 'Error: Method GET not allowed for URL' \
' /post/random_id'
def test_request_stream_blueprint(app):

View File

@ -78,7 +78,7 @@ class DelayableTCPConnector(TCPConnector):
await asyncio.sleep(self.delay)
t = req.loop.time()
print("sending at {}".format(t), flush=True)
conn = next(iter(args)) # first arg is connection
next(iter(args)) # first arg is connection
try:
return await self.orig_send(*args, **kwargs)

View File

@ -97,7 +97,7 @@ def test_json(app):
results = json_loads(response.text)
assert results.get('test') == True
assert results.get('test') is True
def test_empty_json(app):
@ -278,22 +278,23 @@ def test_post_form_urlencoded(app):
payload = 'test=OK'
headers = {'content-type': 'application/x-www-form-urlencoded'}
request, response = app.test_client.post('/', data=payload, headers=headers)
request, response = app.test_client.post('/', data=payload,
headers=headers)
assert request.form.get('test') == 'OK'
@pytest.mark.parametrize(
'payload', [
'------sanic\r\n' \
'Content-Disposition: form-data; name="test"\r\n' \
'\r\n' \
'OK\r\n' \
'------sanic\r\n'
'Content-Disposition: form-data; name="test"\r\n'
'\r\n'
'OK\r\n'
'------sanic--\r\n',
'------sanic\r\n' \
'content-disposition: form-data; name="test"\r\n' \
'\r\n' \
'OK\r\n' \
'------sanic\r\n'
'content-disposition: form-data; name="test"\r\n'
'\r\n'
'OK\r\n'
'------sanic--\r\n',
])
def test_post_form_multipart_form_data(app, payload):
@ -362,3 +363,83 @@ def test_url_attributes_with_ssl(app, path, query, expected_url):
assert parsed.path == request.path
assert parsed.query == request.query_string
assert parsed.netloc == request.host
def test_form_with_multiple_values(app):
@app.route('/', methods=['POST'])
async def handler(request):
return text("OK")
payload="selectedItems=v1&selectedItems=v2&selectedItems=v3"
headers = {'content-type': 'application/x-www-form-urlencoded'}
request, response = app.test_client.post('/', data=payload,
headers=headers)
assert request.form.getlist("selectedItems") == ["v1", "v2", "v3"]
def test_request_string_representation(app):
@app.route('/', methods=["GET"])
async def get(request):
return text("OK")
request, _ = app.test_client.get("/")
assert repr(request) == '<Request: GET />'
@pytest.mark.parametrize(
'payload', [
'------sanic\r\n'
'Content-Disposition: form-data; filename="filename"; name="test"\r\n'
'\r\n'
'OK\r\n'
'------sanic--\r\n',
'------sanic\r\n'
'content-disposition: form-data; filename="filename"; name="test"\r\n'
'\r\n'
'content-type: application/json; {"field": "value"}\r\n'
'------sanic--\r\n',
])
def test_request_multipart_files(app, payload):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
headers = {'content-type': 'multipart/form-data; boundary=----sanic'}
request, _ = app.test_client.post(data=payload, headers=headers)
assert request.files.get('test').name == "filename"
def test_request_multipart_file_with_json_content_type(app):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
payload = '------sanic\r\nContent-Disposition: form-data; name="file"; filename="test.json"' \
'\r\nContent-Type: application/json\r\n\r\n\r\n------sanic--'
headers = {'content-type': 'multipart/form-data; boundary=------sanic'}
request, _ = app.test_client.post(data=payload, headers=headers)
assert request.files.get('file').type == 'application/json'
def test_request_multipart_with_multiple_files_and_type(app):
@app.route("/", methods=["POST"])
async def post(request):
return text("OK")
payload = '------sanic\r\nContent-Disposition: form-data; name="file"; filename="test.json"' \
'\r\nContent-Type: application/json\r\n\r\n\r\n' \
'------sanic\r\nContent-Disposition: form-data; name="file"; filename="some_file.pdf"\r\n' \
'Content-Type: application/pdf\r\n\r\n\r\n------sanic--'
headers = {'content-type': 'multipart/form-data; boundary=------sanic'}
request, _ = app.test_client.post(data=payload, headers=headers)
assert len(request.files.getlist('file')) == 2
assert request.files.getlist('file')[0].type == 'application/json'
assert request.files.getlist('file')[1].type == 'application/pdf'

View File

@ -1,20 +1,19 @@
import sys
import asyncio
import inspect
import os
from aiofiles import os as async_os
from mimetypes import guess_type
from random import choice
from unittest.mock import MagicMock
from urllib.parse import unquote
import pytest
from random import choice
from aiofiles import os as async_os
from sanic.response import (
HTTPResponse, stream, StreamingHTTPResponse, file, file_stream, json
)
from sanic.server import HttpProtocol
from sanic.testing import HOST, PORT
from unittest.mock import MagicMock
JSON_DATA = {'ok': True}
@ -38,9 +37,8 @@ async def sample_streaming_fn(response):
def test_method_not_allowed(app):
@app.get('/')
async def test(request):
async def test_get(request):
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
@ -49,19 +47,18 @@ def test_method_not_allowed(app):
request, response = app.test_client.post('/')
assert response.headers['Allow'] == 'GET'
@app.post('/')
async def test(request):
async def test_post(request):
return response.json({'hello': 'world'})
request, response = app.test_client.head('/')
assert response.status == 405
assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST'])
assert set(response.headers['Allow'].split(', ')) == {'GET', 'POST'}
assert response.headers['Content-Length'] == '0'
request, response = app.test_client.patch('/')
assert response.status == 405
assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST'])
assert set(response.headers['Allow'].split(', ')) == {'GET', 'POST'}
assert response.headers['Content-Length'] == '0'
@ -75,19 +72,62 @@ def test_response_header(app):
'CONTENT-TYPE': 'application/json'
})
is_windows = sys.platform in ['win32', 'cygwin']
request, response = app.test_client.get('/')
assert dict(response.headers) == {
'Connection': 'keep-alive',
'Keep-Alive': str(app.config.KEEP_ALIVE_TIMEOUT),
# response body contains an extra \r at the end if its windows
# TODO: this is the only place this difference shows up in our tests
# we should figure out a way to unify testing on both platforms
'Content-Length': '12' if is_windows else '11',
'Content-Length': '11',
'Content-Type': 'application/json',
}
def test_response_content_length(app):
@app.get("/response_with_space")
async def response_with_space(request):
return json({
"message": "Data",
"details": "Some Details"
}, headers={
'CONTENT-TYPE': 'application/json'
})
@app.get("/response_without_space")
async def response_without_space(request):
return json({
"message":"Data",
"details":"Some Details"
}, headers={
'CONTENT-TYPE': 'application/json'
})
_, response = app.test_client.get("/response_with_space")
content_length_for_response_with_space = response.headers.get("Content-Length")
_, response = app.test_client.get("/response_without_space")
content_length_for_response_without_space = response.headers.get("Content-Length")
assert content_length_for_response_with_space == content_length_for_response_without_space
assert content_length_for_response_with_space == '43'
def test_response_content_length_with_different_data_types(app):
@app.get("/")
async def get_data_with_different_types(request):
# Indentation issues in the Response is intentional. Please do not fix
return json({
'bool': True,
'none': None,
'string':'string',
'number': -1},
headers={
'CONTENT-TYPE': 'application/json'
})
_, response = app.test_client.get("/")
assert response.headers.get("Content-Length") == '55'
@pytest.fixture
def json_app(app):
@ -239,7 +279,8 @@ def get_file_content(static_file_directory, file_name):
return file.read()
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('status', [200, 401])
def test_file_response(app, file_name, static_file_directory, status):
@ -256,9 +297,15 @@ def test_file_response(app, file_name, static_file_directory, status):
assert 'Content-Disposition' not in response.headers
@pytest.mark.parametrize('source,dest', [
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'), ('python.png', 'logo.png')])
def test_file_response_custom_filename(app, source, dest, static_file_directory):
@pytest.mark.parametrize(
'source,dest',
[
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'),
('python.png', 'logo.png')
]
)
def test_file_response_custom_filename(app, source, dest,
static_file_directory):
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
@ -269,7 +316,8 @@ def test_file_response_custom_filename(app, source, dest, static_file_directory)
request, response = app.test_client.get('/files/{}'.format(source))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert response.headers['Content-Disposition'] == 'attachment; filename="{}"'.format(dest)
assert response.headers['Content-Disposition'] == \
'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
@ -300,7 +348,8 @@ def test_file_head_response(app, file_name, static_file_directory):
get_file_content(static_file_directory, file_name))
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
def test_file_stream_response(app, file_name, static_file_directory):
@app.route('/files/<filename>', methods=['GET'])
@ -316,9 +365,15 @@ def test_file_stream_response(app, file_name, static_file_directory):
assert 'Content-Disposition' not in response.headers
@pytest.mark.parametrize('source,dest', [
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'), ('python.png', 'logo.png')])
def test_file_stream_response_custom_filename(app, source, dest, static_file_directory):
@pytest.mark.parametrize(
'source,dest',
[
('test.file', 'my_file.txt'), ('decode me.txt', 'readme.md'),
('python.png', 'logo.png')
]
)
def test_file_stream_response_custom_filename(app, source, dest,
static_file_directory):
@app.route('/files/<filename>', methods=['GET'])
def file_route(request, filename):
@ -329,7 +384,8 @@ def test_file_stream_response_custom_filename(app, source, dest, static_file_dir
request, response = app.test_client.get('/files/{}'.format(source))
assert response.status == 200
assert response.body == get_file_content(static_file_directory, source)
assert response.headers['Content-Disposition'] == 'attachment; filename="{}"'.format(dest)
assert response.headers['Content-Disposition'] == \
'attachment; filename="{}"'.format(dest)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt'])
@ -350,8 +406,10 @@ def test_file_stream_head_response(app, file_name, static_file_directory):
headers=headers,
content_type=guess_type(file_path)[0] or 'text/plain')
else:
return file_stream(file_path, chunk_size=32, headers=headers,
mime_type=guess_type(file_path)[0] or 'text/plain')
return file_stream(
file_path, chunk_size=32, headers=headers,
mime_type=guess_type(file_path)[0] or 'text/plain'
)
request, response = app.test_client.head('/files/{}'.format(file_name))
assert response.status == 200

View File

@ -64,12 +64,12 @@ def test_shorthand_routes_multiple(app):
def test_route_strict_slash(app):
@app.get('/get', strict_slashes=True)
def handler(request):
def handler1(request):
assert request.stream is None
return text('OK')
@app.post('/post/', strict_slashes=True)
def handler(request):
def handler2(request):
assert request.stream is None
return text('OK')
@ -133,11 +133,11 @@ def test_route_strict_slash_default_value_can_be_overwritten():
def test_route_slashes_overload(app):
@app.get('/hello/')
def handler(request):
def handler_get(request):
return text('OK')
@app.post('/hello/')
def handler(request):
def handler_post(request):
return text('OK')
request, response = app.test_client.get('/hello')
@ -408,7 +408,8 @@ def test_dynamic_route_uuid(app):
results.append(unique_id)
return text('OK')
request, response = app.test_client.get('/quirky/123e4567-e89b-12d3-a456-426655440000')
url = '/quirky/123e4567-e89b-12d3-a456-426655440000'
request, response = app.test_client.get(url)
assert response.text == 'OK'
assert type(results[0]) is uuid.UUID
@ -532,11 +533,11 @@ def test_route_duplicate(app):
with pytest.raises(RouteExists):
@app.route('/test/<dynamic>/')
async def handler1(request, dynamic):
async def handler3(request, dynamic):
pass
@app.route('/test/<dynamic>/')
async def handler2(request, dynamic):
async def handler4(request, dynamic):
pass
@ -882,12 +883,12 @@ def test_unmergeable_overload_routes(app):
assert response.text == 'OK1'
@app.route('/overload_part', methods=['GET'])
async def handler1(request):
async def handler3(request):
return text('OK1')
with pytest.raises(RouteExists):
@app.route('/overload_part')
async def handler2(request):
async def handler4(request):
return text('Duplicated')
request, response = app.test_client.get('/overload_part')

View File

@ -11,12 +11,15 @@ async def stop(app, loop):
calledq = Queue()
def set_loop(app, loop):
loop.add_signal_handler = MagicMock()
def after(app, loop):
calledq.put(loop.add_signal_handler.called)
def test_register_system_signals(app):
"""Test if sanic register system signals"""
@ -29,7 +32,7 @@ def test_register_system_signals(app):
app.listener('after_server_stop')(after)
app.run(HOST, PORT)
assert calledq.get() == True
assert calledq.get() is True
def test_dont_register_system_signals(app):
@ -44,4 +47,4 @@ def test_dont_register_system_signals(app):
app.listener('after_server_stop')(after)
app.run(HOST, PORT, register_sys_signals=False)
assert calledq.get() == False
assert calledq.get() is False

View File

@ -23,7 +23,8 @@ def get_file_content(static_file_directory, file_name):
return file.read()
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
def test_static_file(app, static_file_directory, file_name):
app.static(
'/testing.file', get_file_path(static_file_directory, file_name))
@ -83,11 +84,11 @@ def test_static_content_range_correct(app, file_name, static_file_directory):
'Range': 'bytes=12-19'
}
request, response = app.test_client.get('/testing.file', headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
static_file_directory, file_name))[12:19]
static_file_directory, file_name))[12:20]
assert int(response.headers[
'Content-Length']) == len(static_content)
assert response.body == static_content
@ -103,7 +104,7 @@ def test_static_content_range_front(app, file_name, static_file_directory):
'Range': 'bytes=12-'
}
request, response = app.test_client.get('/testing.file', headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
@ -123,7 +124,7 @@ def test_static_content_range_back(app, file_name, static_file_directory):
'Range': 'bytes=-12'
}
request, response = app.test_client.get('/testing.file', headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
@ -143,8 +144,8 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
assert response.status == 200
assert 'Content-Length' in response.headers
assert 'Content-Range' not in response.headers
assert int(response.headers[
'Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert int(response.headers['Content-Length']) == \
len(get_file_content(static_file_directory, file_name))
assert response.body == bytes(
get_file_content(static_file_directory, file_name))
@ -166,7 +167,8 @@ def test_static_content_range_error(app, file_name, static_file_directory):
len(get_file_content(static_file_directory, file_name)),)
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
def test_static_file_specified_host(app, static_file_directory, file_name):
app.static(
'/testing.file',

View File

@ -13,12 +13,16 @@ URL_FOR_ARGS1 = dict(arg1=['v1', 'v2'])
URL_FOR_VALUE1 = '/myurl?arg1=v1&arg1=v2'
URL_FOR_ARGS2 = dict(arg1=['v1', 'v2'], _anchor='anchor')
URL_FOR_VALUE2 = '/myurl?arg1=v1&arg1=v2#anchor'
URL_FOR_ARGS3 = dict(arg1='v1', _anchor='anchor', _scheme='http',
_server='{}:{}'.format(test_host, test_port), _external=True)
URL_FOR_VALUE3 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host, test_port)
URL_FOR_ARGS3 = dict(
arg1='v1', _anchor='anchor', _scheme='http',
_server='{}:{}'.format(test_host, test_port), _external=True
)
URL_FOR_VALUE3 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host,
test_port)
URL_FOR_ARGS4 = dict(arg1='v1', _anchor='anchor', _external=True,
_server='http://{}:{}'.format(test_host, test_port))
URL_FOR_VALUE4 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host, test_port)
URL_FOR_VALUE4 = 'http://{}:{}/myurl?arg1=v1#anchor'.format(test_host,
test_port)
def _generate_handlers_from_names(app, l):

View File

@ -25,7 +25,8 @@ def get_file_content(static_file_directory, file_name):
return file.read()
@pytest.mark.parametrize('file_name', ['test.file', 'decode me.txt', 'python.png'])
@pytest.mark.parametrize('file_name',
['test.file', 'decode me.txt', 'python.png'])
def test_static_file(app, static_file_directory, file_name):
app.static(
'/testing.file', get_file_path(static_file_directory, file_name))
@ -211,11 +212,11 @@ def test_static_content_range_correct(app, file_name, static_file_directory):
assert uri == app.url_for('static', name='static', filename='any')
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
static_file_directory, file_name))[12:19]
static_file_directory, file_name))[12:20]
assert int(response.headers[
'Content-Length']) == len(static_content)
assert response.body == static_content
@ -232,11 +233,11 @@ def test_static_content_range_correct(app, file_name, static_file_directory):
filename='any')
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
static_file_directory, file_name))[12:19]
static_file_directory, file_name))[12:20]
assert int(response.headers[
'Content-Length']) == len(static_content)
assert response.body == static_content
@ -262,7 +263,7 @@ def test_static_content_range_front(app, file_name, static_file_directory):
assert uri == app.url_for('static', name='static', filename='any')
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
@ -283,7 +284,7 @@ def test_static_content_range_front(app, file_name, static_file_directory):
filename='any')
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
@ -313,7 +314,7 @@ def test_static_content_range_back(app, file_name, static_file_directory):
assert uri == app.url_for('static', name='static', filename='any')
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
@ -334,7 +335,7 @@ def test_static_content_range_back(app, file_name, static_file_directory):
filename='any')
request, response = app.test_client.get(uri, headers=headers)
assert response.status == 200
assert response.status == 206
assert 'Content-Length' in response.headers
assert 'Content-Range' in response.headers
static_content = bytes(get_file_content(
@ -364,8 +365,8 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
assert response.status == 200
assert 'Content-Length' in response.headers
assert 'Content-Range' not in response.headers
assert int(response.headers[
'Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert int(response.headers['Content-Length']) == \
len(get_file_content(static_file_directory, file_name))
assert response.body == bytes(
get_file_content(static_file_directory, file_name))
@ -384,8 +385,8 @@ def test_static_content_range_empty(app, file_name, static_file_directory):
assert response.status == 200
assert 'Content-Length' in response.headers
assert 'Content-Range' not in response.headers
assert int(response.headers[
'Content-Length']) == len(get_file_content(static_file_directory, file_name))
assert int(response.headers['Content-Length']) == \
len(get_file_content(static_file_directory, file_name))
assert response.body == bytes(
get_file_content(static_file_directory, file_name))

View File

@ -4,11 +4,11 @@ from sanic.response import text
def test_vhosts(app):
@app.route('/', host="example.com")
async def handler(request):
async def handler1(request):
return text("You're at example.com!")
@app.route('/', host="subdomain.example.com")
async def handler(request):
async def handler2(request):
return text("You're at subdomain.example.com!")
headers = {"Host": "example.com"}
@ -38,11 +38,11 @@ def test_vhosts_with_list(app):
def test_vhosts_with_defaults(app):
@app.route('/', host="hello.com")
async def handler(request):
async def handler1(request):
return text("Hello, world!")
@app.route('/')
async def handler(request):
async def handler2(request):
return text("default")
headers = {"Host": "hello.com"}

View File

@ -129,7 +129,7 @@ def test_with_middleware_response(app):
results = []
@app.middleware('request')
async def process_response(request):
async def process_request(request):
results.append(request)
@app.middleware('response')
@ -162,7 +162,8 @@ def test_with_custom_class_methods(app):
def get(self, request):
self._iternal_method()
return text('I am get method and global var is {}'.format(self.global_var))
return text('I am get method and global var '
'is {}'.format(self.global_var))
app.add_route(DummyView.as_view(), '/')
request, response = app.test_client.get('/')

View File

@ -102,8 +102,8 @@ def test_run_max_requests_exceeded(worker):
assert not worker.alive
worker.notify.assert_called_with()
worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
worker)
worker.log.info.assert_called_with("Max requests exceeded, shutting "
"down: %s", worker)
def test_worker_close(worker):
@ -125,7 +125,8 @@ def test_worker_close(worker):
worker.loop = loop
server = mock.Mock()
server.close = mock.Mock(wraps=lambda *a, **kw: None)
server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
server.wait_closed = mock.Mock(wraps=asyncio.coroutine(
lambda *a, **kw: None))
worker.servers = {
server: {"requests_count": 14},
}

View File

@ -25,10 +25,12 @@ commands =
deps =
flake8
black
isort
commands =
flake8 sanic
black --check --verbose sanic
isort --check-only --recursive sanic
[testenv:check]
deps =