Create SanicASGITestClient and refactor ASGI methods

This commit is contained in:
Adam Hopkins 2019-05-21 19:30:55 +03:00
parent 4767a67acd
commit 8a56da84e6
4 changed files with 1390 additions and 282 deletions

View File

@ -16,6 +16,7 @@ from typing import Any, Optional, Type, Union
from urllib.parse import urlencode, urlunparse
from sanic import reloader_helpers
from sanic.asgi import ASGIApp
from sanic.blueprint_group import BlueprintGroup
from sanic.config import BASE_LOGO, Config
from sanic.constants import HTTP_METHODS
@ -27,7 +28,7 @@ from sanic.request import Request
from sanic.router import Router
from sanic.server import HttpProtocol, Signal, serve, serve_multiple
from sanic.static import register as static_register
from sanic.testing import SanicTestClient
from sanic.testing import SanicTestClient, SanicASGITestClient
from sanic.views import CompositionView
from sanic.websocket import ConnectionClosed, WebSocketProtocol
@ -981,7 +982,9 @@ class Sanic:
raise CancelledError()
# pass the response to the correct callback
if write_callback is None or isinstance(response, StreamingHTTPResponse):
if write_callback is None or isinstance(
response, StreamingHTTPResponse
):
await stream_callback(response)
else:
write_callback(response)
@ -994,6 +997,10 @@ class Sanic:
def test_client(self):
return SanicTestClient(self)
@property
def asgi_client(self):
return SanicASGITestClient(self)
# -------------------------------------------------------------------- #
# Execution
# -------------------------------------------------------------------- #
@ -1120,9 +1127,6 @@ class Sanic:
"""This kills the Sanic"""
get_event_loop().stop()
def __call__(self, scope):
return ASGIApp(self, scope)
async def create_server(
self,
host: Optional[str] = None,
@ -1365,79 +1369,10 @@ class Sanic:
parts = [self.name, *parts]
return ".".join(parts)
# -------------------------------------------------------------------- #
# ASGI
# -------------------------------------------------------------------- #
class MockTransport:
def __init__(self, scope):
self.scope = scope
def get_extra_info(self, info):
if info == 'peername':
return self.scope.get('server')
elif info == 'sslcontext':
return self.scope.get('scheme') in ["https", "wss"]
class ASGIApp:
def __init__(self, sanic_app, scope):
self.sanic_app = sanic_app
url_bytes = scope.get('root_path', '') + scope['path']
url_bytes = url_bytes.encode('latin-1')
url_bytes += scope['query_string']
headers = CIMultiDict([
(key.decode('latin-1'), value.decode('latin-1'))
for key, value in scope.get('headers', [])
])
version = scope['http_version']
method = scope['method']
self.request = Request(url_bytes, headers, version, method, MockTransport(scope))
self.request.app = sanic_app
async def read_body(self, receive):
"""
Read and return the entire body from an incoming ASGI message.
"""
body = b''
more_body = True
while more_body:
message = await receive()
body += message.get('body', b'')
more_body = message.get('more_body', False)
return body
async def __call__(self, receive, send):
"""
Handle the incoming request.
"""
self.send = send
self.request.body = await self.read_body(receive)
handler = self.sanic_app.handle_request
await handler(self.request, None, self.stream_callback)
async def stream_callback(self, response):
"""
Write the response.
"""
if isinstance(response, StreamingHTTPResponse):
raise NotImplementedError('Not supported')
headers = [
(str(name).encode('latin-1'), str(value).encode('latin-1'))
for name, value in response.headers.items()
]
if 'content-length' not in response.headers:
headers += [(
b'content-length',
str(len(response.body)).encode('latin-1')
)]
await self.send({
'type': 'http.response.start',
'status': response.status,
'headers': headers
})
await self.send({
'type': 'http.response.body',
'body': response.body,
'more_body': False
})
async def __call__(self, scope, receive, send):
asgi_app = ASGIApp(self, scope, receive, send)
await asgi_app()

93
sanic/asgi.py Normal file
View File

@ -0,0 +1,93 @@
from sanic.request import Request
from multidict import CIMultiDict
from sanic.response import StreamingHTTPResponse
class MockTransport:
def __init__(self, scope):
self.scope = scope
def get_extra_info(self, info):
if info == "peername":
return self.scope.get("server")
elif info == "sslcontext":
return self.scope.get("scheme") in ["https", "wss"]
class ASGIApp:
def __init__(self, sanic_app, scope, receive, send):
self.sanic_app = sanic_app
self.receive = receive
self.send = send
url_bytes = scope.get("root_path", "") + scope["path"]
url_bytes = url_bytes.encode("latin-1")
url_bytes += scope["query_string"]
headers = CIMultiDict(
[
(key.decode("latin-1"), value.decode("latin-1"))
for key, value in scope.get("headers", [])
]
)
version = scope["http_version"]
method = scope["method"]
self.request = Request(
url_bytes,
headers,
version,
method,
MockTransport(scope),
sanic_app,
)
async def read_body(self):
"""
Read and return the entire body from an incoming ASGI message.
"""
body = b""
more_body = True
while more_body:
message = await self.receive()
body += message.get("body", b"")
more_body = message.get("more_body", False)
return body
async def __call__(self):
"""
Handle the incoming request.
"""
self.request.body = await self.read_body()
handler = self.sanic_app.handle_request
await handler(self.request, None, self.stream_callback)
async def stream_callback(self, response):
"""
Write the response.
"""
if isinstance(response, StreamingHTTPResponse):
raise NotImplementedError("Not supported")
headers = [
(str(name).encode("latin-1"), str(value).encode("latin-1"))
for name, value in response.headers.items()
]
if "content-length" not in response.headers:
headers += [
(b"content-length", str(len(response.body)).encode("latin-1"))
]
await self.send(
{
"type": "http.response.start",
"status": response.status,
"headers": headers,
}
)
await self.send(
{
"type": "http.response.body",
"body": response.body,
"more_body": False,
}
)

View File

@ -1,42 +1,25 @@
from json import JSONDecodeError
from socket import socket
from urllib.parse import unquote, urljoin, urlsplit
import httpcore
import requests_async as requests
import websockets
import asyncio
import http
import io
import json
import queue
import threading
import types
import typing
from urllib.parse import unquote, urljoin, urlparse, parse_qs
import requests
from starlette.types import ASGIApp, Message, Scope
from starlette.websockets import WebSocketDisconnect
import websockets
from sanic.asgi import ASGIApp
from sanic.exceptions import MethodNotSupported
from sanic.log import logger
from sanic.response import text
HOST = "127.0.0.1"
PORT = 42101
class SanicTestClient:
def __init__(self, app, port=PORT):
"""Use port=None to bind to a random port"""
self.app = app
self.raise_server_exceptions = raise_server_exceptions
def send( # type: ignore
self, request: requests.PreparedRequest, *args: typing.Any, **kwargs: typing.Any
) -> requests.Response:
scheme, netloc, path, params, query, fragement = urlparse( # type: ignore
request.url
)
self.port = port
def get_new_session(self):
return requests.Session()
@ -83,75 +66,27 @@ class SanicTestClient:
debug=False,
server_kwargs={"auto_reload": False},
*request_args,
**request_kwargs
**request_kwargs,
):
results = [None, None]
exceptions = []
# Include other request headers.
headers += [
(key.lower().encode(), value.encode())
for key, value in request.headers.items()
]
if gather_request:
if scheme in {"ws", "wss"}:
subprotocol = request.headers.get("sec-websocket-protocol", None)
if subprotocol is None:
subprotocols = [] # type: typing.Sequence[str]
def _collect_request(request):
if results[0] is None:
results[0] = request
self.app.request_middleware.appendleft(_collect_request)
@self.app.exception(MethodNotSupported)
async def error_handler(request, exception):
if request.method in ["HEAD", "PATCH", "PUT", "DELETE"]:
return text(
"", exception.status_code, headers=exception.headers
)
else:
subprotocols = [value.strip() for value in subprotocol.split(",")]
scope = {
"type": "websocket",
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"subprotocols": subprotocols,
}
session = WebSocketTestSession(self.app, scope)
raise _Upgrade(session)
scope = {
"type": "http",
"http_version": "1.1",
"method": request.method,
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"extensions": {"http.response.template": {}},
}
async def receive() -> Message:
nonlocal request_complete, response_complete
if request_complete:
while not response_complete:
await asyncio.sleep(0.0001)
return {"type": "http.disconnect"}
body = request.body
if isinstance(body, str):
body_bytes = body.encode("utf-8") # type: bytes
elif body is None:
body_bytes = b""
elif isinstance(body, types.GeneratorType):
try:
chunk = body.send(None)
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
return {"type": "http.request", "body": chunk, "more_body": True}
except StopIteration:
request_complete = True
return {"type": "http.request", "body": b""}
else:
body_bytes = body
return self.app.error_handler.default(request, exception)
if self.port:
server_kwargs = dict(host=HOST, port=self.port, **server_kwargs)
@ -179,6 +114,151 @@ class SanicTestClient:
response = await self._local_request(
method, url, *request_args, **request_kwargs
)
results[-1] = response
except Exception as e:
logger.exception("Exception")
exceptions.append(e)
self.app.stop()
self.app.run(debug=debug, **server_kwargs)
self.app.listeners["after_server_start"].pop()
if exceptions:
raise ValueError("Exception during request: {}".format(exceptions))
if gather_request:
try:
request, response = results
return request, response
except BaseException:
raise ValueError(
"Request and response object expected, got ({})".format(
results
)
)
else:
try:
return results[-1]
except BaseException:
raise ValueError(
"Request object expected, got ({})".format(results)
)
def get(self, *args, **kwargs):
return self._sanic_endpoint_test("get", *args, **kwargs)
def post(self, *args, **kwargs):
return self._sanic_endpoint_test("post", *args, **kwargs)
def put(self, *args, **kwargs):
return self._sanic_endpoint_test("put", *args, **kwargs)
def delete(self, *args, **kwargs):
return self._sanic_endpoint_test("delete", *args, **kwargs)
def patch(self, *args, **kwargs):
return self._sanic_endpoint_test("patch", *args, **kwargs)
def options(self, *args, **kwargs):
return self._sanic_endpoint_test("options", *args, **kwargs)
def head(self, *args, **kwargs):
return self._sanic_endpoint_test("head", *args, **kwargs)
def websocket(self, *args, **kwargs):
return self._sanic_endpoint_test("websocket", *args, **kwargs)
class SanicASGIAdapter(requests.asgi.ASGIAdapter):
async def send( # type: ignore
self,
request: requests.PreparedRequest,
gather_return: bool = False,
*args: typing.Any,
**kwargs: typing.Any,
) -> requests.Response:
scheme, netloc, path, query, fragment = urlsplit(
request.url
) # type: ignore
default_port = {"http": 80, "ws": 80, "https": 443, "wss": 443}[scheme]
if ":" in netloc:
host, port_string = netloc.split(":", 1)
port = int(port_string)
else:
host = netloc
port = default_port
# Include the 'host' header.
if "host" in request.headers:
headers = [] # type: typing.List[typing.Tuple[bytes, bytes]]
elif port == default_port:
headers = [(b"host", host.encode())]
else:
headers = [(b"host", (f"{host}:{port}").encode())]
# Include other request headers.
headers += [
(key.lower().encode(), value.encode())
for key, value in request.headers.items()
]
scope = {
"type": "http",
"http_version": "1.1",
"method": request.method,
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"extensions": {"http.response.template": {}},
}
async def receive():
nonlocal request_complete, response_complete
if request_complete:
while not response_complete:
await asyncio.sleep(0.0001)
return {"type": "http.disconnect"}
body = request.body
if isinstance(body, str):
body_bytes = body.encode("utf-8") # type: bytes
elif body is None:
body_bytes = b""
elif isinstance(body, types.GeneratorType):
try:
chunk = body.send(None)
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
return {
"type": "http.request",
"body": chunk,
"more_body": True,
}
except StopIteration:
request_complete = True
return {"type": "http.request", "body": b""}
else:
body_bytes = body
request_complete = True
return {"type": "http.request", "body": body_bytes}
async def send(message) -> None:
nonlocal raw_kwargs, response_started, response_complete, template, context
if message["type"] == "http.response.start":
assert (
not response_started
), 'Received multiple "http.response.start" messages.'
raw_kwargs["status_code"] = message["status"]
raw_kwargs["headers"] = message["headers"]
response_started = True
elif message["type"] == "http.response.body":
assert (
@ -190,9 +270,8 @@ class SanicTestClient:
body = message.get("body", b"")
more_body = message.get("more_body", False)
if request.method != "HEAD":
raw_kwargs["body"].write(body)
raw_kwargs["body"] += body
if not more_body:
raw_kwargs["body"].seek(0)
response_complete = True
elif message["type"] == "http.response.template":
template = message["template"]
@ -201,155 +280,200 @@ class SanicTestClient:
request_complete = False
response_started = False
response_complete = False
raw_kwargs = {"body": io.BytesIO()} # type: typing.Dict[str, typing.Any]
raw_kwargs = {"body": b""} # type: typing.Dict[str, typing.Any]
template = None
context = None
return_value = None
self.app.run(debug=debug, **server_kwargs)
self.app.listeners["after_server_start"].pop()
self.app.is_running = True
try:
connection = self.app(scope)
loop.run_until_complete(connection(receive, send))
return_value = await self.app(scope, receive, send)
except BaseException as exc:
if self.raise_server_exceptions:
if not self.suppress_exceptions:
raise exc from None
if self.raise_server_exceptions:
if not self.suppress_exceptions:
assert response_started, "TestClient did not receive any response."
elif not response_started:
raw_kwargs = {
"version": 11,
"status": 500,
"reason": "Internal Server Error",
"headers": [],
"preload_content": False,
"original_response": _MockOriginalResponse([]),
"body": io.BytesIO(),
}
raw_kwargs = {"status_code": 500, "headers": []}
raw = requests.packages.urllib3.HTTPResponse(**raw_kwargs)
raw = httpcore.Response(**raw_kwargs)
response = self.build_response(request, raw)
if template is not None:
response.template = template
response.context = context
if gather_return:
response.return_value = return_value
return response
class SanicTestClient(requests.Session):
__test__ = False # For pytest to not discover this up.
class TestASGIApp(ASGIApp):
async def __call__(self):
await super().__call__()
return self.request
async def app_call_with_return(self, scope, receive, send):
asgi_app = TestASGIApp(self, scope, receive, send)
return await asgi_app()
class SanicASGITestClient(requests.ASGISession):
def __init__(
self,
app: ASGIApp,
base_url: str = "http://%s:%d" % (HOST, PORT),
raise_server_exceptions: bool = True,
app: "Sanic",
base_url: str = "http://mockserver",
suppress_exceptions: bool = False,
) -> None:
super(SanicTestClient, self).__init__()
adapter = _ASGIAdapter(app, raise_server_exceptions=raise_server_exceptions)
app.__class__.__call__ = app_call_with_return
super().__init__(app)
adapter = SanicASGIAdapter(
app, suppress_exceptions=suppress_exceptions
)
self.mount("http://", adapter)
self.mount("https://", adapter)
self.mount("ws://", adapter)
self.mount("wss://", adapter)
self.headers.update({"user-agent": "testclient"})
self.app = app
self.base_url = base_url
def request(
self,
method: str,
url: str = '/',
params: Params = None,
data: DataType = None,
headers: typing.MutableMapping[str, str] = None,
cookies: Cookies = None,
files: FileType = None,
auth: AuthType = None,
timeout: TimeOut = None,
allow_redirects: bool = None,
proxies: typing.MutableMapping[str, str] = None,
hooks: typing.Any = None,
stream: bool = None,
verify: typing.Union[bool, str] = None,
cert: typing.Union[str, typing.Tuple[str, str]] = None,
json: typing.Any = None,
debug = None,
gather_request = True
) -> requests.Response:
if debug is not None:
self.app.debug = debug
async def send(self, *args, **kwargs):
return await super().send(*args, **kwargs)
url = urljoin(self.base_url, url)
response = super().request(
method,
url,
params=params,
data=data,
headers=headers,
cookies=cookies,
files=files,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
proxies=proxies,
hooks=hooks,
stream=stream,
verify=verify,
cert=cert,
json=json,
)
async def request(self, method, url, gather_request=True, *args, **kwargs):
self.gather_request = gather_request
response = await super().request(method, url, *args, **kwargs)
response.status = response.status_code
response.body = response.content
try:
response.json = response.json()
except:
response.json = None
if gather_request:
request = response.request
parsed = urlparse(request.url)
request.scheme = parsed.scheme
request.path = parsed.path
request.args = parse_qs(parsed.query)
if hasattr(response, "return_value"):
request = response.return_value
del response.return_value
return request, response
return response
def get(self, *args, **kwargs):
if 'uri' in kwargs:
kwargs['url'] = kwargs.pop('uri')
return self.request("get", *args, **kwargs)
def merge_environment_settings(self, *args, **kwargs):
settings = super().merge_environment_settings(*args, **kwargs)
settings.update({"gather_return": self.gather_request})
return settings
def post(self, *args, **kwargs):
if 'uri' in kwargs:
kwargs['url'] = kwargs.pop('uri')
return self.request("post", *args, **kwargs)
def put(self, *args, **kwargs):
if 'uri' in kwargs:
kwargs['url'] = kwargs.pop('uri')
return self.request("put", *args, **kwargs)
# class SanicASGITestClient(requests.ASGISession):
# __test__ = False # For pytest to not discover this up.
def delete(self, *args, **kwargs):
if 'uri' in kwargs:
kwargs['url'] = kwargs.pop('uri')
return self.request("delete", *args, **kwargs)
# def __init__(
# self,
# app: "Sanic",
# base_url: str = "http://mockserver",
# suppress_exceptions: bool = False,
# ) -> None:
# app.testing = True
# super().__init__(
# app, base_url=base_url, suppress_exceptions=suppress_exceptions
# )
# # adapter = _ASGIAdapter(
# # app, raise_server_exceptions=raise_server_exceptions
# # )
# # self.mount("http://", adapter)
# # self.mount("https://", adapter)
# # self.mount("ws://", adapter)
# # self.mount("wss://", adapter)
# # self.headers.update({"user-agent": "testclient"})
# # self.base_url = base_url
def patch(self, *args, **kwargs):
if 'uri' in kwargs:
kwargs['url'] = kwargs.pop('uri')
return self.request("patch", *args, **kwargs)
# # def request(
# # self,
# # method: str,
# # url: str = "/",
# # params: typing.Any = None,
# # data: typing.Any = None,
# # headers: typing.MutableMapping[str, str] = None,
# # cookies: typing.Any = None,
# # files: typing.Any = None,
# # auth: typing.Any = None,
# # timeout: typing.Any = None,
# # allow_redirects: bool = None,
# # proxies: typing.MutableMapping[str, str] = None,
# # hooks: typing.Any = None,
# # stream: bool = None,
# # verify: typing.Union[bool, str] = None,
# # cert: typing.Union[str, typing.Tuple[str, str]] = None,
# # json: typing.Any = None,
# # debug=None,
# # gather_request=True,
# # ) -> requests.Response:
# # if debug is not None:
# # self.app.debug = debug
def options(self, *args, **kwargs):
if 'uri' in kwargs:
kwargs['url'] = kwargs.pop('uri')
return self.request("options", *args, **kwargs)
# # url = urljoin(self.base_url, url)
# # response = super().request(
# # method,
# # url,
# # params=params,
# # data=data,
# # headers=headers,
# # cookies=cookies,
# # files=files,
# # auth=auth,
# # timeout=timeout,
# # allow_redirects=allow_redirects,
# # proxies=proxies,
# # hooks=hooks,
# # stream=stream,
# # verify=verify,
# # cert=cert,
# # json=json,
# # )
def head(self, *args, **kwargs):
return self._sanic_endpoint_test("head", *args, **kwargs)
# # response.status = response.status_code
# # response.body = response.content
# # try:
# # response.json = response.json()
# # except:
# # response.json = None
def websocket(self, *args, **kwargs):
return self._sanic_endpoint_test("websocket", *args, **kwargs)
# # if gather_request:
# # request = response.request
# # parsed = urlparse(request.url)
# # request.scheme = parsed.scheme
# # request.path = parsed.path
# # request.args = parse_qs(parsed.query)
# # return request, response
# # return response
# # def get(self, *args, **kwargs):
# # if "uri" in kwargs:
# # kwargs["url"] = kwargs.pop("uri")
# # return self.request("get", *args, **kwargs)
# # def post(self, *args, **kwargs):
# # if "uri" in kwargs:
# # kwargs["url"] = kwargs.pop("uri")
# # return self.request("post", *args, **kwargs)
# # def put(self, *args, **kwargs):
# # if "uri" in kwargs:
# # kwargs["url"] = kwargs.pop("uri")
# # return self.request("put", *args, **kwargs)
# # def delete(self, *args, **kwargs):
# # if "uri" in kwargs:
# # kwargs["url"] = kwargs.pop("uri")
# # return self.request("delete", *args, **kwargs)
# # def patch(self, *args, **kwargs):
# # if "uri" in kwargs:
# # kwargs["url"] = kwargs.pop("uri")
# # return self.request("patch", *args, **kwargs)
# # def options(self, *args, **kwargs):
# # if "uri" in kwargs:
# # kwargs["url"] = kwargs.pop("uri")
# # return self.request("options", *args, **kwargs)
# # def head(self, *args, **kwargs):
# # return self._sanic_endpoint_test("head", *args, **kwargs)
# # def websocket(self, *args, **kwargs):
# # return self._sanic_endpoint_test("websocket", *args, **kwargs)

956
tests/test_asgi.py Normal file
View File

@ -0,0 +1,956 @@
import pytest
from sanic.testing import SanicASGITestClient
from sanic.response import text
def asgi_client_instantiation(app):
assert isinstance(app.asgi_client, SanicASGITestClient)
# import logging
# import os
# import ssl
# from json import dumps as json_dumps
# from json import loads as json_loads
# from urllib.parse import urlparse
# import pytest
# from sanic import Blueprint, Sanic
# from sanic.exceptions import ServerError
# from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters
# from sanic.response import json, text
# from sanic.testing import HOST, PORT
# ------------------------------------------------------------ #
# GET - Adapted from test_requests.py
# ------------------------------------------------------------ #
@pytest.mark.asyncio
async def test_basic_request(app):
@app.route("/")
def handler(request):
return text("Hello")
_, response = await app.asgi_client.get("/")
assert response.text == "Hello"
@pytest.mark.asyncio
async def test_ip(app):
@app.route("/")
def handler(request):
return text("{}".format(request.ip))
request, response = await app.asgi_client.get("/")
assert response.text == "mockserver"
@pytest.mark.asyncio
def test_text(app):
@app.route("/")
async def handler(request):
return text("Hello")
request, response = await app.asgi_client.get("/")
assert response.text == "Hello"
# def test_headers(app):
# @app.route("/")
# async def handler(request):
# headers = {"spam": "great"}
# return text("Hello", headers=headers)
# request, response = app.asgi_client.get("/")
# assert response.headers.get("spam") == "great"
# def test_non_str_headers(app):
# @app.route("/")
# async def handler(request):
# headers = {"answer": 42}
# return text("Hello", headers=headers)
# request, response = app.asgi_client.get("/")
# assert response.headers.get("answer") == "42"
# def test_invalid_response(app):
# @app.exception(ServerError)
# def handler_exception(request, exception):
# return text("Internal Server Error.", 500)
# @app.route("/")
# async def handler(request):
# return "This should fail"
# request, response = app.asgi_client.get("/")
# assert response.status == 500
# assert response.text == "Internal Server Error."
# def test_json(app):
# @app.route("/")
# async def handler(request):
# return json({"test": True})
# request, response = app.asgi_client.get("/")
# results = json_loads(response.text)
# assert results.get("test") is True
# def test_empty_json(app):
# @app.route("/")
# async def handler(request):
# assert request.json is None
# return json(request.json)
# request, response = app.asgi_client.get("/")
# assert response.status == 200
# assert response.text == "null"
# def test_invalid_json(app):
# @app.route("/")
# async def handler(request):
# return json(request.json)
# data = "I am not json"
# request, response = app.asgi_client.get("/", data=data)
# assert response.status == 400
# def test_query_string(app):
# @app.route("/")
# async def handler(request):
# return text("OK")
# request, response = app.asgi_client.get(
# "/", params=[("test1", "1"), ("test2", "false"), ("test2", "true")]
# )
# assert request.args.get("test1") == "1"
# assert request.args.get("test2") == "false"
# assert request.args.getlist("test2") == ["false", "true"]
# assert request.args.getlist("test1") == ["1"]
# assert request.args.get("test3", default="My value") == "My value"
# def test_uri_template(app):
# @app.route("/foo/<id:int>/bar/<name:[A-z]+>")
# async def handler(request, id, name):
# return text("OK")
# request, response = app.asgi_client.get("/foo/123/bar/baz")
# assert request.uri_template == "/foo/<id:int>/bar/<name:[A-z]+>"
# def test_token(app):
# @app.route("/")
# async def handler(request):
# return text("OK")
# # uuid4 generated token.
# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
# headers = {
# "content-type": "application/json",
# "Authorization": "{}".format(token),
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token == token
# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
# headers = {
# "content-type": "application/json",
# "Authorization": "Token {}".format(token),
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token == token
# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
# headers = {
# "content-type": "application/json",
# "Authorization": "Bearer {}".format(token),
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token == token
# # no Authorization headers
# headers = {"content-type": "application/json"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.token is None
# def test_content_type(app):
# @app.route("/")
# async def handler(request):
# return text(request.content_type)
# request, response = app.asgi_client.get("/")
# assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE
# assert response.text == DEFAULT_HTTP_CONTENT_TYPE
# headers = {"content-type": "application/json"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.content_type == "application/json"
# assert response.text == "application/json"
# def test_remote_addr_with_two_proxies(app):
# app.config.PROXIES_COUNT = 2
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.2"
# assert response.text == "127.0.0.2"
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.1"
# assert response.text == "127.0.0.1"
# request, response = app.asgi_client.get("/")
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.0.1, , ,,127.0.1.2"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.1"
# assert response.text == "127.0.0.1"
# headers = {
# "X-Forwarded-For": ", 127.0.2.2, , ,127.0.0.1, , ,,127.0.1.2"
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.1"
# assert response.text == "127.0.0.1"
# def test_remote_addr_with_infinite_number_of_proxies(app):
# app.config.PROXIES_COUNT = -1
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.2"
# assert response.text == "127.0.0.2"
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.1.1"
# assert response.text == "127.0.1.1"
# headers = {
# "X-Forwarded-For": "127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2, 127.0.0.1"
# }
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.5"
# assert response.text == "127.0.0.5"
# def test_remote_addr_without_proxy(app):
# app.config.PROXIES_COUNT = 0
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# def test_remote_addr_custom_headers(app):
# app.config.PROXIES_COUNT = 1
# app.config.REAL_IP_HEADER = "Client-IP"
# app.config.FORWARDED_FOR_HEADER = "Forwarded"
# @app.route("/")
# async def handler(request):
# return text(request.remote_addr)
# headers = {"X-Real-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.1.1"
# assert response.text == "127.0.1.1"
# headers = {"X-Forwarded-For": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == ""
# assert response.text == ""
# headers = {"Client-IP": "127.0.0.2", "Forwarded": "127.0.1.1"}
# request, response = app.asgi_client.get("/", headers=headers)
# assert request.remote_addr == "127.0.0.2"
# assert response.text == "127.0.0.2"
# def test_match_info(app):
# @app.route("/api/v1/user/<user_id>/")
# async def handler(request, user_id):
# return json(request.match_info)
# request, response = app.asgi_client.get("/api/v1/user/sanic_user/")
# assert request.match_info == {"user_id": "sanic_user"}
# assert json_loads(response.text) == {"user_id": "sanic_user"}
# # ------------------------------------------------------------ #
# # POST
# # ------------------------------------------------------------ #
# def test_post_json(app):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = {"test": "OK"}
# headers = {"content-type": "application/json"}
# request, response = app.asgi_client.post(
# "/", data=json_dumps(payload), headers=headers
# )
# assert request.json.get("test") == "OK"
# assert request.json.get("test") == "OK" # for request.parsed_json
# assert response.text == "OK"
# def test_post_form_urlencoded(app):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = "test=OK"
# headers = {"content-type": "application/x-www-form-urlencoded"}
# request, response = app.asgi_client.post(
# "/", data=payload, headers=headers
# )
# assert request.form.get("test") == "OK"
# assert request.form.get("test") == "OK" # For request.parsed_form
# @pytest.mark.parametrize(
# "payload",
# [
# "------sanic\r\n"
# 'Content-Disposition: form-data; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "------sanic\r\n"
# 'content-disposition: form-data; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# ],
# )
# def test_post_form_multipart_form_data(app, payload):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# headers = {"content-type": "multipart/form-data; boundary=----sanic"}
# request, response = app.asgi_client.post(data=payload, headers=headers)
# assert request.form.get("test") == "OK"
# @pytest.mark.parametrize(
# "path,query,expected_url",
# [
# ("/foo", "", "http://{}:{}/foo"),
# ("/bar/baz", "", "http://{}:{}/bar/baz"),
# ("/moo/boo", "arg1=val1", "http://{}:{}/moo/boo?arg1=val1"),
# ],
# )
# def test_url_attributes_no_ssl(app, path, query, expected_url):
# async def handler(request):
# return text("OK")
# app.add_route(handler, path)
# request, response = app.asgi_client.get(path + "?{}".format(query))
# assert request.url == expected_url.format(HOST, PORT)
# parsed = urlparse(request.url)
# assert parsed.scheme == request.scheme
# assert parsed.path == request.path
# assert parsed.query == request.query_string
# assert parsed.netloc == request.host
# @pytest.mark.parametrize(
# "path,query,expected_url",
# [
# ("/foo", "", "https://{}:{}/foo"),
# ("/bar/baz", "", "https://{}:{}/bar/baz"),
# ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
# ],
# )
# def test_url_attributes_with_ssl_context(app, path, query, expected_url):
# current_dir = os.path.dirname(os.path.realpath(__file__))
# context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
# context.load_cert_chain(
# os.path.join(current_dir, "certs/selfsigned.cert"),
# keyfile=os.path.join(current_dir, "certs/selfsigned.key"),
# )
# async def handler(request):
# return text("OK")
# app.add_route(handler, path)
# request, response = app.asgi_client.get(
# "https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
# server_kwargs={"ssl": context},
# )
# assert request.url == expected_url.format(HOST, PORT)
# parsed = urlparse(request.url)
# assert parsed.scheme == request.scheme
# assert parsed.path == request.path
# assert parsed.query == request.query_string
# assert parsed.netloc == request.host
# @pytest.mark.parametrize(
# "path,query,expected_url",
# [
# ("/foo", "", "https://{}:{}/foo"),
# ("/bar/baz", "", "https://{}:{}/bar/baz"),
# ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"),
# ],
# )
# def test_url_attributes_with_ssl_dict(app, path, query, expected_url):
# current_dir = os.path.dirname(os.path.realpath(__file__))
# ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert")
# ssl_key = os.path.join(current_dir, "certs/selfsigned.key")
# ssl_dict = {"cert": ssl_cert, "key": ssl_key}
# async def handler(request):
# return text("OK")
# app.add_route(handler, path)
# request, response = app.asgi_client.get(
# "https://{}:{}".format(HOST, PORT) + path + "?{}".format(query),
# server_kwargs={"ssl": ssl_dict},
# )
# assert request.url == expected_url.format(HOST, PORT)
# parsed = urlparse(request.url)
# assert parsed.scheme == request.scheme
# assert parsed.path == request.path
# assert parsed.query == request.query_string
# assert parsed.netloc == request.host
# def test_invalid_ssl_dict(app):
# @app.get("/test")
# async def handler(request):
# return text("ssl test")
# ssl_dict = {"cert": None, "key": None}
# with pytest.raises(ValueError) as excinfo:
# request, response = app.asgi_client.get(
# "/test", server_kwargs={"ssl": ssl_dict}
# )
# assert str(excinfo.value) == "SSLContext or certificate and key required."
# def test_form_with_multiple_values(app):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = "selectedItems=v1&selectedItems=v2&selectedItems=v3"
# headers = {"content-type": "application/x-www-form-urlencoded"}
# request, response = app.asgi_client.post(
# "/", data=payload, headers=headers
# )
# assert request.form.getlist("selectedItems") == ["v1", "v2", "v3"]
# def test_request_string_representation(app):
# @app.route("/", methods=["GET"])
# async def get(request):
# return text("OK")
# request, _ = app.asgi_client.get("/")
# assert repr(request) == "<Request: GET />"
# @pytest.mark.parametrize(
# "payload,filename",
# [
# (
# "------sanic\r\n"
# 'Content-Disposition: form-data; filename="filename"; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "filename",
# ),
# (
# "------sanic\r\n"
# 'content-disposition: form-data; filename="filename"; name="test"\r\n'
# "\r\n"
# 'content-type: application/json; {"field": "value"}\r\n'
# "------sanic--\r\n",
# "filename",
# ),
# (
# "------sanic\r\n"
# 'Content-Disposition: form-data; filename=""; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "",
# ),
# (
# "------sanic\r\n"
# 'content-disposition: form-data; filename=""; name="test"\r\n'
# "\r\n"
# 'content-type: application/json; {"field": "value"}\r\n'
# "------sanic--\r\n",
# "",
# ),
# (
# "------sanic\r\n"
# 'Content-Disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
# "\r\n"
# "OK\r\n"
# "------sanic--\r\n",
# "filename_\u00A0_test",
# ),
# (
# "------sanic\r\n"
# 'content-disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n'
# "\r\n"
# 'content-type: application/json; {"field": "value"}\r\n'
# "------sanic--\r\n",
# "filename_\u00A0_test",
# ),
# ],
# )
# def test_request_multipart_files(app, payload, filename):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# headers = {"content-type": "multipart/form-data; boundary=----sanic"}
# request, _ = app.asgi_client.post(data=payload, headers=headers)
# assert request.files.get("test").name == filename
# def test_request_multipart_file_with_json_content_type(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# "------sanic\r\n"
# 'Content-Disposition: form-data; name="file"; filename="test.json"\r\n'
# "Content-Type: application/json\r\n"
# "Content-Length: 0"
# "\r\n"
# "\r\n"
# "------sanic--"
# )
# headers = {"content-type": "multipart/form-data; boundary=------sanic"}
# request, _ = app.asgi_client.post(data=payload, headers=headers)
# assert request.files.get("file").type == "application/json"
# def test_request_multipart_file_without_field_name(app, caplog):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# '------sanic\r\nContent-Disposition: form-data; filename="test.json"'
# "\r\nContent-Type: application/json\r\n\r\n\r\n------sanic--"
# )
# headers = {"content-type": "multipart/form-data; boundary=------sanic"}
# request, _ = app.asgi_client.post(
# data=payload, headers=headers, debug=True
# )
# with caplog.at_level(logging.DEBUG):
# request.form
# assert caplog.record_tuples[-1] == (
# "sanic.root",
# logging.DEBUG,
# "Form-data field does not have a 'name' parameter "
# "in the Content-Disposition header",
# )
# def test_request_multipart_file_duplicate_filed_name(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# "--e73ffaa8b1b2472b8ec848de833cb05b\r\n"
# 'Content-Disposition: form-data; name="file"\r\n'
# "Content-Type: application/octet-stream\r\n"
# "Content-Length: 15\r\n"
# "\r\n"
# '{"test":"json"}\r\n'
# "--e73ffaa8b1b2472b8ec848de833cb05b\r\n"
# 'Content-Disposition: form-data; name="file"\r\n'
# "Content-Type: application/octet-stream\r\n"
# "Content-Length: 15\r\n"
# "\r\n"
# '{"test":"json2"}\r\n'
# "--e73ffaa8b1b2472b8ec848de833cb05b--\r\n"
# )
# headers = {
# "Content-Type": "multipart/form-data; boundary=e73ffaa8b1b2472b8ec848de833cb05b"
# }
# request, _ = app.asgi_client.post(
# data=payload, headers=headers, debug=True
# )
# assert request.form.getlist("file") == [
# '{"test":"json"}',
# '{"test":"json2"}',
# ]
# def test_request_multipart_with_multiple_files_and_type(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# payload = (
# '------sanic\r\nContent-Disposition: form-data; name="file"; filename="test.json"'
# "\r\nContent-Type: application/json\r\n\r\n\r\n"
# '------sanic\r\nContent-Disposition: form-data; name="file"; filename="some_file.pdf"\r\n'
# "Content-Type: application/pdf\r\n\r\n\r\n------sanic--"
# )
# headers = {"content-type": "multipart/form-data; boundary=------sanic"}
# request, _ = app.asgi_client.post(data=payload, headers=headers)
# assert len(request.files.getlist("file")) == 2
# assert request.files.getlist("file")[0].type == "application/json"
# assert request.files.getlist("file")[1].type == "application/pdf"
# def test_request_repr(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/")
# assert repr(request) == "<Request: GET />"
# request.method = None
# assert repr(request) == "<Request: None />"
# def test_request_bool(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/")
# assert bool(request)
# request.transport = False
# assert not bool(request)
# def test_request_parsing_form_failed(app, caplog):
# @app.route("/", methods=["POST"])
# async def handler(request):
# return text("OK")
# payload = "test=OK"
# headers = {"content-type": "multipart/form-data"}
# request, response = app.asgi_client.post(
# "/", data=payload, headers=headers
# )
# with caplog.at_level(logging.ERROR):
# request.form
# assert caplog.record_tuples[-1] == (
# "sanic.error",
# logging.ERROR,
# "Failed when parsing form",
# )
# def test_request_args_no_query_string(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/")
# assert request.args == {}
# def test_request_raw_args(app):
# params = {"test": "OK"}
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/", params=params)
# assert request.raw_args == params
# def test_request_query_args(app):
# # test multiple params with the same key
# params = [("test", "value1"), ("test", "value2")]
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get("/", params=params)
# assert request.query_args == params
# # test cached value
# assert (
# request.parsed_not_grouped_args[(False, False, "utf-8", "replace")]
# == request.query_args
# )
# # test params directly in the url
# request, response = app.asgi_client.get("/?test=value1&test=value2")
# assert request.query_args == params
# # test unique params
# params = [("test1", "value1"), ("test2", "value2")]
# request, response = app.asgi_client.get("/", params=params)
# assert request.query_args == params
# # test no params
# request, response = app.asgi_client.get("/")
# assert not request.query_args
# def test_request_query_args_custom_parsing(app):
# @app.get("/")
# def handler(request):
# return text("pass")
# request, response = app.asgi_client.get(
# "/?test1=value1&test2=&test3=value3"
# )
# assert request.get_query_args(keep_blank_values=True) == [
# ("test1", "value1"),
# ("test2", ""),
# ("test3", "value3"),
# ]
# assert request.query_args == [("test1", "value1"), ("test3", "value3")]
# assert request.get_query_args(keep_blank_values=False) == [
# ("test1", "value1"),
# ("test3", "value3"),
# ]
# assert request.get_args(keep_blank_values=True) == RequestParameters(
# {"test1": ["value1"], "test2": [""], "test3": ["value3"]}
# )
# assert request.args == RequestParameters(
# {"test1": ["value1"], "test3": ["value3"]}
# )
# assert request.get_args(keep_blank_values=False) == RequestParameters(
# {"test1": ["value1"], "test3": ["value3"]}
# )
# def test_request_cookies(app):
# cookies = {"test": "OK"}
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/", cookies=cookies)
# assert request.cookies == cookies
# assert request.cookies == cookies # For request._cookies
# def test_request_cookies_without_cookies(app):
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/")
# assert request.cookies == {}
# def test_request_port(app):
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/")
# port = request.port
# assert isinstance(port, int)
# delattr(request, "_socket")
# delattr(request, "_port")
# port = request.port
# assert isinstance(port, int)
# assert hasattr(request, "_socket")
# assert hasattr(request, "_port")
# def test_request_socket(app):
# @app.get("/")
# def handler(request):
# return text("OK")
# request, response = app.asgi_client.get("/")
# socket = request.socket
# assert isinstance(socket, tuple)
# ip = socket[0]
# port = socket[1]
# assert ip == request.ip
# assert port == request.port
# delattr(request, "_socket")
# socket = request.socket
# assert isinstance(socket, tuple)
# assert hasattr(request, "_socket")
# def test_request_form_invalid_content_type(app):
# @app.route("/", methods=["POST"])
# async def post(request):
# return text("OK")
# request, response = app.asgi_client.post("/", json={"test": "OK"})
# assert request.form == {}
# def test_endpoint_basic():
# app = Sanic()
# @app.route("/")
# def my_unique_handler(request):
# return text("Hello")
# request, response = app.asgi_client.get("/")
# assert request.endpoint == "test_requests.my_unique_handler"
# def test_endpoint_named_app():
# app = Sanic("named")
# @app.route("/")
# def my_unique_handler(request):
# return text("Hello")
# request, response = app.asgi_client.get("/")
# assert request.endpoint == "named.my_unique_handler"
# def test_endpoint_blueprint():
# bp = Blueprint("my_blueprint", url_prefix="/bp")
# @bp.route("/")
# async def bp_root(request):
# return text("Hello")
# app = Sanic("named")
# app.blueprint(bp)
# request, response = app.asgi_client.get("/bp")
# assert request.endpoint == "named.my_blueprint.bp_root"