diff --git a/sanic/app.py b/sanic/app.py index 39ad2ec5..5e309452 100644 --- a/sanic/app.py +++ b/sanic/app.py @@ -16,6 +16,7 @@ from typing import Any, Optional, Type, Union from urllib.parse import urlencode, urlunparse from sanic import reloader_helpers +from sanic.asgi import ASGIApp from sanic.blueprint_group import BlueprintGroup from sanic.config import BASE_LOGO, Config from sanic.constants import HTTP_METHODS @@ -27,7 +28,7 @@ from sanic.request import Request from sanic.router import Router from sanic.server import HttpProtocol, Signal, serve, serve_multiple from sanic.static import register as static_register -from sanic.testing import SanicTestClient +from sanic.testing import SanicTestClient, SanicASGITestClient from sanic.views import CompositionView from sanic.websocket import ConnectionClosed, WebSocketProtocol @@ -981,7 +982,9 @@ class Sanic: raise CancelledError() # pass the response to the correct callback - if write_callback is None or isinstance(response, StreamingHTTPResponse): + if write_callback is None or isinstance( + response, StreamingHTTPResponse + ): await stream_callback(response) else: write_callback(response) @@ -994,6 +997,10 @@ class Sanic: def test_client(self): return SanicTestClient(self) + @property + def asgi_client(self): + return SanicASGITestClient(self) + # -------------------------------------------------------------------- # # Execution # -------------------------------------------------------------------- # @@ -1120,9 +1127,6 @@ class Sanic: """This kills the Sanic""" get_event_loop().stop() - def __call__(self, scope): - return ASGIApp(self, scope) - async def create_server( self, host: Optional[str] = None, @@ -1365,79 +1369,10 @@ class Sanic: parts = [self.name, *parts] return ".".join(parts) + # -------------------------------------------------------------------- # + # ASGI + # -------------------------------------------------------------------- # -class MockTransport: - def __init__(self, scope): - self.scope = scope - - def get_extra_info(self, info): - if info == 'peername': - return self.scope.get('server') - elif info == 'sslcontext': - return self.scope.get('scheme') in ["https", "wss"] - -class ASGIApp: - def __init__(self, sanic_app, scope): - self.sanic_app = sanic_app - url_bytes = scope.get('root_path', '') + scope['path'] - url_bytes = url_bytes.encode('latin-1') - url_bytes += scope['query_string'] - headers = CIMultiDict([ - (key.decode('latin-1'), value.decode('latin-1')) - for key, value in scope.get('headers', []) - ]) - version = scope['http_version'] - method = scope['method'] - self.request = Request(url_bytes, headers, version, method, MockTransport(scope)) - self.request.app = sanic_app - - async def read_body(self, receive): - """ - Read and return the entire body from an incoming ASGI message. - """ - body = b'' - more_body = True - - while more_body: - message = await receive() - body += message.get('body', b'') - more_body = message.get('more_body', False) - - return body - - async def __call__(self, receive, send): - """ - Handle the incoming request. - """ - self.send = send - self.request.body = await self.read_body(receive) - handler = self.sanic_app.handle_request - await handler(self.request, None, self.stream_callback) - - async def stream_callback(self, response): - """ - Write the response. - """ - if isinstance(response, StreamingHTTPResponse): - raise NotImplementedError('Not supported') - - headers = [ - (str(name).encode('latin-1'), str(value).encode('latin-1')) - for name, value in response.headers.items() - ] - if 'content-length' not in response.headers: - headers += [( - b'content-length', - str(len(response.body)).encode('latin-1') - )] - - await self.send({ - 'type': 'http.response.start', - 'status': response.status, - 'headers': headers - }) - await self.send({ - 'type': 'http.response.body', - 'body': response.body, - 'more_body': False - }) + async def __call__(self, scope, receive, send): + asgi_app = ASGIApp(self, scope, receive, send) + await asgi_app() diff --git a/sanic/asgi.py b/sanic/asgi.py new file mode 100644 index 00000000..8e2693f4 --- /dev/null +++ b/sanic/asgi.py @@ -0,0 +1,93 @@ +from sanic.request import Request +from multidict import CIMultiDict +from sanic.response import StreamingHTTPResponse + + +class MockTransport: + def __init__(self, scope): + self.scope = scope + + def get_extra_info(self, info): + if info == "peername": + return self.scope.get("server") + elif info == "sslcontext": + return self.scope.get("scheme") in ["https", "wss"] + + +class ASGIApp: + def __init__(self, sanic_app, scope, receive, send): + self.sanic_app = sanic_app + self.receive = receive + self.send = send + url_bytes = scope.get("root_path", "") + scope["path"] + url_bytes = url_bytes.encode("latin-1") + url_bytes += scope["query_string"] + headers = CIMultiDict( + [ + (key.decode("latin-1"), value.decode("latin-1")) + for key, value in scope.get("headers", []) + ] + ) + version = scope["http_version"] + method = scope["method"] + self.request = Request( + url_bytes, + headers, + version, + method, + MockTransport(scope), + sanic_app, + ) + + async def read_body(self): + """ + Read and return the entire body from an incoming ASGI message. + """ + body = b"" + more_body = True + + while more_body: + message = await self.receive() + body += message.get("body", b"") + more_body = message.get("more_body", False) + + return body + + async def __call__(self): + """ + Handle the incoming request. + """ + self.request.body = await self.read_body() + handler = self.sanic_app.handle_request + await handler(self.request, None, self.stream_callback) + + async def stream_callback(self, response): + """ + Write the response. + """ + if isinstance(response, StreamingHTTPResponse): + raise NotImplementedError("Not supported") + + headers = [ + (str(name).encode("latin-1"), str(value).encode("latin-1")) + for name, value in response.headers.items() + ] + if "content-length" not in response.headers: + headers += [ + (b"content-length", str(len(response.body)).encode("latin-1")) + ] + + await self.send( + { + "type": "http.response.start", + "status": response.status, + "headers": headers, + } + ) + await self.send( + { + "type": "http.response.body", + "body": response.body, + "more_body": False, + } + ) diff --git a/sanic/testing.py b/sanic/testing.py index a3e492ad..77dd274d 100644 --- a/sanic/testing.py +++ b/sanic/testing.py @@ -1,42 +1,25 @@ - from json import JSONDecodeError from socket import socket +from urllib.parse import unquote, urljoin, urlsplit +import httpcore import requests_async as requests -import websockets - -import asyncio -import http -import io -import json -import queue -import threading -import types import typing -from urllib.parse import unquote, urljoin, urlparse, parse_qs - -import requests - -from starlette.types import ASGIApp, Message, Scope -from starlette.websockets import WebSocketDisconnect +import websockets +from sanic.asgi import ASGIApp +from sanic.exceptions import MethodNotSupported +from sanic.log import logger +from sanic.response import text HOST = "127.0.0.1" PORT = 42101 - class SanicTestClient: def __init__(self, app, port=PORT): """Use port=None to bind to a random port""" self.app = app - self.raise_server_exceptions = raise_server_exceptions - - def send( # type: ignore - self, request: requests.PreparedRequest, *args: typing.Any, **kwargs: typing.Any - ) -> requests.Response: - scheme, netloc, path, params, query, fragement = urlparse( # type: ignore - request.url - ) + self.port = port def get_new_session(self): return requests.Session() @@ -83,75 +66,27 @@ class SanicTestClient: debug=False, server_kwargs={"auto_reload": False}, *request_args, - **request_kwargs + **request_kwargs, ): results = [None, None] exceptions = [] - # Include other request headers. - headers += [ - (key.lower().encode(), value.encode()) - for key, value in request.headers.items() - ] + if gather_request: - if scheme in {"ws", "wss"}: - subprotocol = request.headers.get("sec-websocket-protocol", None) - if subprotocol is None: - subprotocols = [] # type: typing.Sequence[str] + def _collect_request(request): + if results[0] is None: + results[0] = request + + self.app.request_middleware.appendleft(_collect_request) + + @self.app.exception(MethodNotSupported) + async def error_handler(request, exception): + if request.method in ["HEAD", "PATCH", "PUT", "DELETE"]: + return text( + "", exception.status_code, headers=exception.headers + ) else: - subprotocols = [value.strip() for value in subprotocol.split(",")] - scope = { - "type": "websocket", - "path": unquote(path), - "root_path": "", - "scheme": scheme, - "query_string": query.encode(), - "headers": headers, - "client": ["testclient", 50000], - "server": [host, port], - "subprotocols": subprotocols, - } - session = WebSocketTestSession(self.app, scope) - raise _Upgrade(session) - - scope = { - "type": "http", - "http_version": "1.1", - "method": request.method, - "path": unquote(path), - "root_path": "", - "scheme": scheme, - "query_string": query.encode(), - "headers": headers, - "client": ["testclient", 50000], - "server": [host, port], - "extensions": {"http.response.template": {}}, - } - - async def receive() -> Message: - nonlocal request_complete, response_complete - - if request_complete: - while not response_complete: - await asyncio.sleep(0.0001) - return {"type": "http.disconnect"} - - body = request.body - if isinstance(body, str): - body_bytes = body.encode("utf-8") # type: bytes - elif body is None: - body_bytes = b"" - elif isinstance(body, types.GeneratorType): - try: - chunk = body.send(None) - if isinstance(chunk, str): - chunk = chunk.encode("utf-8") - return {"type": "http.request", "body": chunk, "more_body": True} - except StopIteration: - request_complete = True - return {"type": "http.request", "body": b""} - else: - body_bytes = body + return self.app.error_handler.default(request, exception) if self.port: server_kwargs = dict(host=HOST, port=self.port, **server_kwargs) @@ -179,6 +114,151 @@ class SanicTestClient: response = await self._local_request( method, url, *request_args, **request_kwargs ) + results[-1] = response + except Exception as e: + logger.exception("Exception") + exceptions.append(e) + self.app.stop() + + self.app.run(debug=debug, **server_kwargs) + self.app.listeners["after_server_start"].pop() + + if exceptions: + raise ValueError("Exception during request: {}".format(exceptions)) + + if gather_request: + try: + request, response = results + return request, response + except BaseException: + raise ValueError( + "Request and response object expected, got ({})".format( + results + ) + ) + else: + try: + return results[-1] + except BaseException: + raise ValueError( + "Request object expected, got ({})".format(results) + ) + + def get(self, *args, **kwargs): + return self._sanic_endpoint_test("get", *args, **kwargs) + + def post(self, *args, **kwargs): + return self._sanic_endpoint_test("post", *args, **kwargs) + + def put(self, *args, **kwargs): + return self._sanic_endpoint_test("put", *args, **kwargs) + + def delete(self, *args, **kwargs): + return self._sanic_endpoint_test("delete", *args, **kwargs) + + def patch(self, *args, **kwargs): + return self._sanic_endpoint_test("patch", *args, **kwargs) + + def options(self, *args, **kwargs): + return self._sanic_endpoint_test("options", *args, **kwargs) + + def head(self, *args, **kwargs): + return self._sanic_endpoint_test("head", *args, **kwargs) + + def websocket(self, *args, **kwargs): + return self._sanic_endpoint_test("websocket", *args, **kwargs) + + +class SanicASGIAdapter(requests.asgi.ASGIAdapter): + async def send( # type: ignore + self, + request: requests.PreparedRequest, + gather_return: bool = False, + *args: typing.Any, + **kwargs: typing.Any, + ) -> requests.Response: + scheme, netloc, path, query, fragment = urlsplit( + request.url + ) # type: ignore + + default_port = {"http": 80, "ws": 80, "https": 443, "wss": 443}[scheme] + + if ":" in netloc: + host, port_string = netloc.split(":", 1) + port = int(port_string) + else: + host = netloc + port = default_port + + # Include the 'host' header. + if "host" in request.headers: + headers = [] # type: typing.List[typing.Tuple[bytes, bytes]] + elif port == default_port: + headers = [(b"host", host.encode())] + else: + headers = [(b"host", (f"{host}:{port}").encode())] + + # Include other request headers. + headers += [ + (key.lower().encode(), value.encode()) + for key, value in request.headers.items() + ] + + scope = { + "type": "http", + "http_version": "1.1", + "method": request.method, + "path": unquote(path), + "root_path": "", + "scheme": scheme, + "query_string": query.encode(), + "headers": headers, + "client": ["testclient", 50000], + "server": [host, port], + "extensions": {"http.response.template": {}}, + } + + async def receive(): + nonlocal request_complete, response_complete + + if request_complete: + while not response_complete: + await asyncio.sleep(0.0001) + return {"type": "http.disconnect"} + + body = request.body + if isinstance(body, str): + body_bytes = body.encode("utf-8") # type: bytes + elif body is None: + body_bytes = b"" + elif isinstance(body, types.GeneratorType): + try: + chunk = body.send(None) + if isinstance(chunk, str): + chunk = chunk.encode("utf-8") + return { + "type": "http.request", + "body": chunk, + "more_body": True, + } + except StopIteration: + request_complete = True + return {"type": "http.request", "body": b""} + else: + body_bytes = body + + request_complete = True + return {"type": "http.request", "body": body_bytes} + + async def send(message) -> None: + nonlocal raw_kwargs, response_started, response_complete, template, context + + if message["type"] == "http.response.start": + assert ( + not response_started + ), 'Received multiple "http.response.start" messages.' + raw_kwargs["status_code"] = message["status"] + raw_kwargs["headers"] = message["headers"] response_started = True elif message["type"] == "http.response.body": assert ( @@ -190,9 +270,8 @@ class SanicTestClient: body = message.get("body", b"") more_body = message.get("more_body", False) if request.method != "HEAD": - raw_kwargs["body"].write(body) + raw_kwargs["body"] += body if not more_body: - raw_kwargs["body"].seek(0) response_complete = True elif message["type"] == "http.response.template": template = message["template"] @@ -201,155 +280,200 @@ class SanicTestClient: request_complete = False response_started = False response_complete = False - raw_kwargs = {"body": io.BytesIO()} # type: typing.Dict[str, typing.Any] + raw_kwargs = {"body": b""} # type: typing.Dict[str, typing.Any] template = None context = None + return_value = None - - self.app.run(debug=debug, **server_kwargs) - self.app.listeners["after_server_start"].pop() - - self.app.is_running = True try: - connection = self.app(scope) - loop.run_until_complete(connection(receive, send)) + return_value = await self.app(scope, receive, send) except BaseException as exc: - if self.raise_server_exceptions: + if not self.suppress_exceptions: raise exc from None - if self.raise_server_exceptions: + if not self.suppress_exceptions: assert response_started, "TestClient did not receive any response." elif not response_started: - raw_kwargs = { - "version": 11, - "status": 500, - "reason": "Internal Server Error", - "headers": [], - "preload_content": False, - "original_response": _MockOriginalResponse([]), - "body": io.BytesIO(), - } + raw_kwargs = {"status_code": 500, "headers": []} - raw = requests.packages.urllib3.HTTPResponse(**raw_kwargs) + raw = httpcore.Response(**raw_kwargs) response = self.build_response(request, raw) if template is not None: response.template = template response.context = context + + if gather_return: + response.return_value = return_value return response -class SanicTestClient(requests.Session): - __test__ = False # For pytest to not discover this up. +class TestASGIApp(ASGIApp): + async def __call__(self): + await super().__call__() + return self.request + +async def app_call_with_return(self, scope, receive, send): + asgi_app = TestASGIApp(self, scope, receive, send) + return await asgi_app() + + +class SanicASGITestClient(requests.ASGISession): def __init__( self, - app: ASGIApp, - base_url: str = "http://%s:%d" % (HOST, PORT), - raise_server_exceptions: bool = True, + app: "Sanic", + base_url: str = "http://mockserver", + suppress_exceptions: bool = False, ) -> None: - super(SanicTestClient, self).__init__() - adapter = _ASGIAdapter(app, raise_server_exceptions=raise_server_exceptions) + app.__class__.__call__ = app_call_with_return + + super().__init__(app) + + adapter = SanicASGIAdapter( + app, suppress_exceptions=suppress_exceptions + ) self.mount("http://", adapter) self.mount("https://", adapter) - self.mount("ws://", adapter) - self.mount("wss://", adapter) self.headers.update({"user-agent": "testclient"}) self.app = app self.base_url = base_url - def request( - self, - method: str, - url: str = '/', - params: Params = None, - data: DataType = None, - headers: typing.MutableMapping[str, str] = None, - cookies: Cookies = None, - files: FileType = None, - auth: AuthType = None, - timeout: TimeOut = None, - allow_redirects: bool = None, - proxies: typing.MutableMapping[str, str] = None, - hooks: typing.Any = None, - stream: bool = None, - verify: typing.Union[bool, str] = None, - cert: typing.Union[str, typing.Tuple[str, str]] = None, - json: typing.Any = None, - debug = None, - gather_request = True - ) -> requests.Response: - if debug is not None: - self.app.debug = debug + async def send(self, *args, **kwargs): + return await super().send(*args, **kwargs) - url = urljoin(self.base_url, url) - response = super().request( - method, - url, - params=params, - data=data, - headers=headers, - cookies=cookies, - files=files, - auth=auth, - timeout=timeout, - allow_redirects=allow_redirects, - proxies=proxies, - hooks=hooks, - stream=stream, - verify=verify, - cert=cert, - json=json, - ) + async def request(self, method, url, gather_request=True, *args, **kwargs): + self.gather_request = gather_request + response = await super().request(method, url, *args, **kwargs) - response.status = response.status_code - response.body = response.content - try: - response.json = response.json() - except: - response.json = None - - if gather_request: - request = response.request - parsed = urlparse(request.url) - request.scheme = parsed.scheme - request.path = parsed.path - request.args = parse_qs(parsed.query) + if hasattr(response, "return_value"): + request = response.return_value + del response.return_value return request, response return response - def get(self, *args, **kwargs): - if 'uri' in kwargs: - kwargs['url'] = kwargs.pop('uri') - return self.request("get", *args, **kwargs) + def merge_environment_settings(self, *args, **kwargs): + settings = super().merge_environment_settings(*args, **kwargs) + settings.update({"gather_return": self.gather_request}) + return settings - def post(self, *args, **kwargs): - if 'uri' in kwargs: - kwargs['url'] = kwargs.pop('uri') - return self.request("post", *args, **kwargs) - def put(self, *args, **kwargs): - if 'uri' in kwargs: - kwargs['url'] = kwargs.pop('uri') - return self.request("put", *args, **kwargs) +# class SanicASGITestClient(requests.ASGISession): +# __test__ = False # For pytest to not discover this up. - def delete(self, *args, **kwargs): - if 'uri' in kwargs: - kwargs['url'] = kwargs.pop('uri') - return self.request("delete", *args, **kwargs) +# def __init__( +# self, +# app: "Sanic", +# base_url: str = "http://mockserver", +# suppress_exceptions: bool = False, +# ) -> None: +# app.testing = True +# super().__init__( +# app, base_url=base_url, suppress_exceptions=suppress_exceptions +# ) +# # adapter = _ASGIAdapter( +# # app, raise_server_exceptions=raise_server_exceptions +# # ) +# # self.mount("http://", adapter) +# # self.mount("https://", adapter) +# # self.mount("ws://", adapter) +# # self.mount("wss://", adapter) +# # self.headers.update({"user-agent": "testclient"}) +# # self.base_url = base_url - def patch(self, *args, **kwargs): - if 'uri' in kwargs: - kwargs['url'] = kwargs.pop('uri') - return self.request("patch", *args, **kwargs) +# # def request( +# # self, +# # method: str, +# # url: str = "/", +# # params: typing.Any = None, +# # data: typing.Any = None, +# # headers: typing.MutableMapping[str, str] = None, +# # cookies: typing.Any = None, +# # files: typing.Any = None, +# # auth: typing.Any = None, +# # timeout: typing.Any = None, +# # allow_redirects: bool = None, +# # proxies: typing.MutableMapping[str, str] = None, +# # hooks: typing.Any = None, +# # stream: bool = None, +# # verify: typing.Union[bool, str] = None, +# # cert: typing.Union[str, typing.Tuple[str, str]] = None, +# # json: typing.Any = None, +# # debug=None, +# # gather_request=True, +# # ) -> requests.Response: +# # if debug is not None: +# # self.app.debug = debug - def options(self, *args, **kwargs): - if 'uri' in kwargs: - kwargs['url'] = kwargs.pop('uri') - return self.request("options", *args, **kwargs) +# # url = urljoin(self.base_url, url) +# # response = super().request( +# # method, +# # url, +# # params=params, +# # data=data, +# # headers=headers, +# # cookies=cookies, +# # files=files, +# # auth=auth, +# # timeout=timeout, +# # allow_redirects=allow_redirects, +# # proxies=proxies, +# # hooks=hooks, +# # stream=stream, +# # verify=verify, +# # cert=cert, +# # json=json, +# # ) - def head(self, *args, **kwargs): - return self._sanic_endpoint_test("head", *args, **kwargs) +# # response.status = response.status_code +# # response.body = response.content +# # try: +# # response.json = response.json() +# # except: +# # response.json = None - def websocket(self, *args, **kwargs): - return self._sanic_endpoint_test("websocket", *args, **kwargs) +# # if gather_request: +# # request = response.request +# # parsed = urlparse(request.url) +# # request.scheme = parsed.scheme +# # request.path = parsed.path +# # request.args = parse_qs(parsed.query) +# # return request, response + +# # return response + +# # def get(self, *args, **kwargs): +# # if "uri" in kwargs: +# # kwargs["url"] = kwargs.pop("uri") +# # return self.request("get", *args, **kwargs) + +# # def post(self, *args, **kwargs): +# # if "uri" in kwargs: +# # kwargs["url"] = kwargs.pop("uri") +# # return self.request("post", *args, **kwargs) + +# # def put(self, *args, **kwargs): +# # if "uri" in kwargs: +# # kwargs["url"] = kwargs.pop("uri") +# # return self.request("put", *args, **kwargs) + +# # def delete(self, *args, **kwargs): +# # if "uri" in kwargs: +# # kwargs["url"] = kwargs.pop("uri") +# # return self.request("delete", *args, **kwargs) + +# # def patch(self, *args, **kwargs): +# # if "uri" in kwargs: +# # kwargs["url"] = kwargs.pop("uri") +# # return self.request("patch", *args, **kwargs) + +# # def options(self, *args, **kwargs): +# # if "uri" in kwargs: +# # kwargs["url"] = kwargs.pop("uri") +# # return self.request("options", *args, **kwargs) + +# # def head(self, *args, **kwargs): +# # return self._sanic_endpoint_test("head", *args, **kwargs) + +# # def websocket(self, *args, **kwargs): +# # return self._sanic_endpoint_test("websocket", *args, **kwargs) diff --git a/tests/test_asgi.py b/tests/test_asgi.py new file mode 100644 index 00000000..fcda40de --- /dev/null +++ b/tests/test_asgi.py @@ -0,0 +1,956 @@ +import pytest + +from sanic.testing import SanicASGITestClient +from sanic.response import text + + +def asgi_client_instantiation(app): + assert isinstance(app.asgi_client, SanicASGITestClient) + + +# import logging +# import os +# import ssl + +# from json import dumps as json_dumps +# from json import loads as json_loads +# from urllib.parse import urlparse + +# import pytest + +# from sanic import Blueprint, Sanic +# from sanic.exceptions import ServerError +# from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters +# from sanic.response import json, text +# from sanic.testing import HOST, PORT + + +# ------------------------------------------------------------ # +# GET - Adapted from test_requests.py +# ------------------------------------------------------------ # + + +@pytest.mark.asyncio +async def test_basic_request(app): + @app.route("/") + def handler(request): + return text("Hello") + + _, response = await app.asgi_client.get("/") + assert response.text == "Hello" + + +@pytest.mark.asyncio +async def test_ip(app): + @app.route("/") + def handler(request): + return text("{}".format(request.ip)) + + request, response = await app.asgi_client.get("/") + + assert response.text == "mockserver" + + +@pytest.mark.asyncio +def test_text(app): + @app.route("/") + async def handler(request): + return text("Hello") + + request, response = await app.asgi_client.get("/") + + assert response.text == "Hello" + + +# def test_headers(app): +# @app.route("/") +# async def handler(request): +# headers = {"spam": "great"} +# return text("Hello", headers=headers) + +# request, response = app.asgi_client.get("/") + +# assert response.headers.get("spam") == "great" + + +# def test_non_str_headers(app): +# @app.route("/") +# async def handler(request): +# headers = {"answer": 42} +# return text("Hello", headers=headers) + +# request, response = app.asgi_client.get("/") + +# assert response.headers.get("answer") == "42" + + +# def test_invalid_response(app): +# @app.exception(ServerError) +# def handler_exception(request, exception): +# return text("Internal Server Error.", 500) + +# @app.route("/") +# async def handler(request): +# return "This should fail" + +# request, response = app.asgi_client.get("/") +# assert response.status == 500 +# assert response.text == "Internal Server Error." + + +# def test_json(app): +# @app.route("/") +# async def handler(request): +# return json({"test": True}) + +# request, response = app.asgi_client.get("/") + +# results = json_loads(response.text) + +# assert results.get("test") is True + + +# def test_empty_json(app): +# @app.route("/") +# async def handler(request): +# assert request.json is None +# return json(request.json) + +# request, response = app.asgi_client.get("/") +# assert response.status == 200 +# assert response.text == "null" + + +# def test_invalid_json(app): +# @app.route("/") +# async def handler(request): +# return json(request.json) + +# data = "I am not json" +# request, response = app.asgi_client.get("/", data=data) + +# assert response.status == 400 + + +# def test_query_string(app): +# @app.route("/") +# async def handler(request): +# return text("OK") + +# request, response = app.asgi_client.get( +# "/", params=[("test1", "1"), ("test2", "false"), ("test2", "true")] +# ) + +# assert request.args.get("test1") == "1" +# assert request.args.get("test2") == "false" +# assert request.args.getlist("test2") == ["false", "true"] +# assert request.args.getlist("test1") == ["1"] +# assert request.args.get("test3", default="My value") == "My value" + + +# def test_uri_template(app): +# @app.route("/foo//bar/") +# async def handler(request, id, name): +# return text("OK") + +# request, response = app.asgi_client.get("/foo/123/bar/baz") +# assert request.uri_template == "/foo//bar/" + + +# def test_token(app): +# @app.route("/") +# async def handler(request): +# return text("OK") + +# # uuid4 generated token. +# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" +# headers = { +# "content-type": "application/json", +# "Authorization": "{}".format(token), +# } + +# request, response = app.asgi_client.get("/", headers=headers) + +# assert request.token == token + +# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" +# headers = { +# "content-type": "application/json", +# "Authorization": "Token {}".format(token), +# } + +# request, response = app.asgi_client.get("/", headers=headers) + +# assert request.token == token + +# token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf" +# headers = { +# "content-type": "application/json", +# "Authorization": "Bearer {}".format(token), +# } + +# request, response = app.asgi_client.get("/", headers=headers) + +# assert request.token == token + +# # no Authorization headers +# headers = {"content-type": "application/json"} + +# request, response = app.asgi_client.get("/", headers=headers) + +# assert request.token is None + + +# def test_content_type(app): +# @app.route("/") +# async def handler(request): +# return text(request.content_type) + +# request, response = app.asgi_client.get("/") +# assert request.content_type == DEFAULT_HTTP_CONTENT_TYPE +# assert response.text == DEFAULT_HTTP_CONTENT_TYPE + +# headers = {"content-type": "application/json"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.content_type == "application/json" +# assert response.text == "application/json" + + +# def test_remote_addr_with_two_proxies(app): +# app.config.PROXIES_COUNT = 2 + +# @app.route("/") +# async def handler(request): +# return text(request.remote_addr) + +# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.2" +# assert response.text == "127.0.0.2" + +# headers = {"X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "" +# assert response.text == "" + +# headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.1" +# assert response.text == "127.0.0.1" + +# request, response = app.asgi_client.get("/") +# assert request.remote_addr == "" +# assert response.text == "" + +# headers = {"X-Forwarded-For": "127.0.0.1, , ,,127.0.1.2"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.1" +# assert response.text == "127.0.0.1" + +# headers = { +# "X-Forwarded-For": ", 127.0.2.2, , ,127.0.0.1, , ,,127.0.1.2" +# } +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.1" +# assert response.text == "127.0.0.1" + + +# def test_remote_addr_with_infinite_number_of_proxies(app): +# app.config.PROXIES_COUNT = -1 + +# @app.route("/") +# async def handler(request): +# return text(request.remote_addr) + +# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.2" +# assert response.text == "127.0.0.2" + +# headers = {"X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.1.1" +# assert response.text == "127.0.1.1" + +# headers = { +# "X-Forwarded-For": "127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2, 127.0.0.1" +# } +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.5" +# assert response.text == "127.0.0.5" + + +# def test_remote_addr_without_proxy(app): +# app.config.PROXIES_COUNT = 0 + +# @app.route("/") +# async def handler(request): +# return text(request.remote_addr) + +# headers = {"X-Real-IP": "127.0.0.2", "X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "" +# assert response.text == "" + +# headers = {"X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "" +# assert response.text == "" + +# headers = {"X-Forwarded-For": "127.0.0.1, 127.0.1.2"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "" +# assert response.text == "" + + +# def test_remote_addr_custom_headers(app): +# app.config.PROXIES_COUNT = 1 +# app.config.REAL_IP_HEADER = "Client-IP" +# app.config.FORWARDED_FOR_HEADER = "Forwarded" + +# @app.route("/") +# async def handler(request): +# return text(request.remote_addr) + +# headers = {"X-Real-IP": "127.0.0.2", "Forwarded": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.1.1" +# assert response.text == "127.0.1.1" + +# headers = {"X-Forwarded-For": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "" +# assert response.text == "" + +# headers = {"Client-IP": "127.0.0.2", "Forwarded": "127.0.1.1"} +# request, response = app.asgi_client.get("/", headers=headers) +# assert request.remote_addr == "127.0.0.2" +# assert response.text == "127.0.0.2" + + +# def test_match_info(app): +# @app.route("/api/v1/user//") +# async def handler(request, user_id): +# return json(request.match_info) + +# request, response = app.asgi_client.get("/api/v1/user/sanic_user/") + +# assert request.match_info == {"user_id": "sanic_user"} +# assert json_loads(response.text) == {"user_id": "sanic_user"} + + +# # ------------------------------------------------------------ # +# # POST +# # ------------------------------------------------------------ # + + +# def test_post_json(app): +# @app.route("/", methods=["POST"]) +# async def handler(request): +# return text("OK") + +# payload = {"test": "OK"} +# headers = {"content-type": "application/json"} + +# request, response = app.asgi_client.post( +# "/", data=json_dumps(payload), headers=headers +# ) + +# assert request.json.get("test") == "OK" +# assert request.json.get("test") == "OK" # for request.parsed_json +# assert response.text == "OK" + + +# def test_post_form_urlencoded(app): +# @app.route("/", methods=["POST"]) +# async def handler(request): +# return text("OK") + +# payload = "test=OK" +# headers = {"content-type": "application/x-www-form-urlencoded"} + +# request, response = app.asgi_client.post( +# "/", data=payload, headers=headers +# ) + +# assert request.form.get("test") == "OK" +# assert request.form.get("test") == "OK" # For request.parsed_form + + +# @pytest.mark.parametrize( +# "payload", +# [ +# "------sanic\r\n" +# 'Content-Disposition: form-data; name="test"\r\n' +# "\r\n" +# "OK\r\n" +# "------sanic--\r\n", +# "------sanic\r\n" +# 'content-disposition: form-data; name="test"\r\n' +# "\r\n" +# "OK\r\n" +# "------sanic--\r\n", +# ], +# ) +# def test_post_form_multipart_form_data(app, payload): +# @app.route("/", methods=["POST"]) +# async def handler(request): +# return text("OK") + +# headers = {"content-type": "multipart/form-data; boundary=----sanic"} + +# request, response = app.asgi_client.post(data=payload, headers=headers) + +# assert request.form.get("test") == "OK" + + +# @pytest.mark.parametrize( +# "path,query,expected_url", +# [ +# ("/foo", "", "http://{}:{}/foo"), +# ("/bar/baz", "", "http://{}:{}/bar/baz"), +# ("/moo/boo", "arg1=val1", "http://{}:{}/moo/boo?arg1=val1"), +# ], +# ) +# def test_url_attributes_no_ssl(app, path, query, expected_url): +# async def handler(request): +# return text("OK") + +# app.add_route(handler, path) + +# request, response = app.asgi_client.get(path + "?{}".format(query)) +# assert request.url == expected_url.format(HOST, PORT) + +# parsed = urlparse(request.url) + +# assert parsed.scheme == request.scheme +# assert parsed.path == request.path +# assert parsed.query == request.query_string +# assert parsed.netloc == request.host + + +# @pytest.mark.parametrize( +# "path,query,expected_url", +# [ +# ("/foo", "", "https://{}:{}/foo"), +# ("/bar/baz", "", "https://{}:{}/bar/baz"), +# ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"), +# ], +# ) +# def test_url_attributes_with_ssl_context(app, path, query, expected_url): +# current_dir = os.path.dirname(os.path.realpath(__file__)) +# context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) +# context.load_cert_chain( +# os.path.join(current_dir, "certs/selfsigned.cert"), +# keyfile=os.path.join(current_dir, "certs/selfsigned.key"), +# ) + +# async def handler(request): +# return text("OK") + +# app.add_route(handler, path) + +# request, response = app.asgi_client.get( +# "https://{}:{}".format(HOST, PORT) + path + "?{}".format(query), +# server_kwargs={"ssl": context}, +# ) +# assert request.url == expected_url.format(HOST, PORT) + +# parsed = urlparse(request.url) + +# assert parsed.scheme == request.scheme +# assert parsed.path == request.path +# assert parsed.query == request.query_string +# assert parsed.netloc == request.host + + +# @pytest.mark.parametrize( +# "path,query,expected_url", +# [ +# ("/foo", "", "https://{}:{}/foo"), +# ("/bar/baz", "", "https://{}:{}/bar/baz"), +# ("/moo/boo", "arg1=val1", "https://{}:{}/moo/boo?arg1=val1"), +# ], +# ) +# def test_url_attributes_with_ssl_dict(app, path, query, expected_url): + +# current_dir = os.path.dirname(os.path.realpath(__file__)) +# ssl_cert = os.path.join(current_dir, "certs/selfsigned.cert") +# ssl_key = os.path.join(current_dir, "certs/selfsigned.key") + +# ssl_dict = {"cert": ssl_cert, "key": ssl_key} + +# async def handler(request): +# return text("OK") + +# app.add_route(handler, path) + +# request, response = app.asgi_client.get( +# "https://{}:{}".format(HOST, PORT) + path + "?{}".format(query), +# server_kwargs={"ssl": ssl_dict}, +# ) +# assert request.url == expected_url.format(HOST, PORT) + +# parsed = urlparse(request.url) + +# assert parsed.scheme == request.scheme +# assert parsed.path == request.path +# assert parsed.query == request.query_string +# assert parsed.netloc == request.host + + +# def test_invalid_ssl_dict(app): +# @app.get("/test") +# async def handler(request): +# return text("ssl test") + +# ssl_dict = {"cert": None, "key": None} + +# with pytest.raises(ValueError) as excinfo: +# request, response = app.asgi_client.get( +# "/test", server_kwargs={"ssl": ssl_dict} +# ) + +# assert str(excinfo.value) == "SSLContext or certificate and key required." + + +# def test_form_with_multiple_values(app): +# @app.route("/", methods=["POST"]) +# async def handler(request): +# return text("OK") + +# payload = "selectedItems=v1&selectedItems=v2&selectedItems=v3" + +# headers = {"content-type": "application/x-www-form-urlencoded"} + +# request, response = app.asgi_client.post( +# "/", data=payload, headers=headers +# ) + +# assert request.form.getlist("selectedItems") == ["v1", "v2", "v3"] + + +# def test_request_string_representation(app): +# @app.route("/", methods=["GET"]) +# async def get(request): +# return text("OK") + +# request, _ = app.asgi_client.get("/") +# assert repr(request) == "" + + +# @pytest.mark.parametrize( +# "payload,filename", +# [ +# ( +# "------sanic\r\n" +# 'Content-Disposition: form-data; filename="filename"; name="test"\r\n' +# "\r\n" +# "OK\r\n" +# "------sanic--\r\n", +# "filename", +# ), +# ( +# "------sanic\r\n" +# 'content-disposition: form-data; filename="filename"; name="test"\r\n' +# "\r\n" +# 'content-type: application/json; {"field": "value"}\r\n' +# "------sanic--\r\n", +# "filename", +# ), +# ( +# "------sanic\r\n" +# 'Content-Disposition: form-data; filename=""; name="test"\r\n' +# "\r\n" +# "OK\r\n" +# "------sanic--\r\n", +# "", +# ), +# ( +# "------sanic\r\n" +# 'content-disposition: form-data; filename=""; name="test"\r\n' +# "\r\n" +# 'content-type: application/json; {"field": "value"}\r\n' +# "------sanic--\r\n", +# "", +# ), +# ( +# "------sanic\r\n" +# 'Content-Disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n' +# "\r\n" +# "OK\r\n" +# "------sanic--\r\n", +# "filename_\u00A0_test", +# ), +# ( +# "------sanic\r\n" +# 'content-disposition: form-data; filename*="utf-8\'\'filename_%C2%A0_test"; name="test"\r\n' +# "\r\n" +# 'content-type: application/json; {"field": "value"}\r\n' +# "------sanic--\r\n", +# "filename_\u00A0_test", +# ), +# ], +# ) +# def test_request_multipart_files(app, payload, filename): +# @app.route("/", methods=["POST"]) +# async def post(request): +# return text("OK") + +# headers = {"content-type": "multipart/form-data; boundary=----sanic"} + +# request, _ = app.asgi_client.post(data=payload, headers=headers) +# assert request.files.get("test").name == filename + + +# def test_request_multipart_file_with_json_content_type(app): +# @app.route("/", methods=["POST"]) +# async def post(request): +# return text("OK") + +# payload = ( +# "------sanic\r\n" +# 'Content-Disposition: form-data; name="file"; filename="test.json"\r\n' +# "Content-Type: application/json\r\n" +# "Content-Length: 0" +# "\r\n" +# "\r\n" +# "------sanic--" +# ) + +# headers = {"content-type": "multipart/form-data; boundary=------sanic"} + +# request, _ = app.asgi_client.post(data=payload, headers=headers) +# assert request.files.get("file").type == "application/json" + + +# def test_request_multipart_file_without_field_name(app, caplog): +# @app.route("/", methods=["POST"]) +# async def post(request): +# return text("OK") + +# payload = ( +# '------sanic\r\nContent-Disposition: form-data; filename="test.json"' +# "\r\nContent-Type: application/json\r\n\r\n\r\n------sanic--" +# ) + +# headers = {"content-type": "multipart/form-data; boundary=------sanic"} + +# request, _ = app.asgi_client.post( +# data=payload, headers=headers, debug=True +# ) +# with caplog.at_level(logging.DEBUG): +# request.form + +# assert caplog.record_tuples[-1] == ( +# "sanic.root", +# logging.DEBUG, +# "Form-data field does not have a 'name' parameter " +# "in the Content-Disposition header", +# ) + + +# def test_request_multipart_file_duplicate_filed_name(app): +# @app.route("/", methods=["POST"]) +# async def post(request): +# return text("OK") + +# payload = ( +# "--e73ffaa8b1b2472b8ec848de833cb05b\r\n" +# 'Content-Disposition: form-data; name="file"\r\n' +# "Content-Type: application/octet-stream\r\n" +# "Content-Length: 15\r\n" +# "\r\n" +# '{"test":"json"}\r\n' +# "--e73ffaa8b1b2472b8ec848de833cb05b\r\n" +# 'Content-Disposition: form-data; name="file"\r\n' +# "Content-Type: application/octet-stream\r\n" +# "Content-Length: 15\r\n" +# "\r\n" +# '{"test":"json2"}\r\n' +# "--e73ffaa8b1b2472b8ec848de833cb05b--\r\n" +# ) + +# headers = { +# "Content-Type": "multipart/form-data; boundary=e73ffaa8b1b2472b8ec848de833cb05b" +# } + +# request, _ = app.asgi_client.post( +# data=payload, headers=headers, debug=True +# ) +# assert request.form.getlist("file") == [ +# '{"test":"json"}', +# '{"test":"json2"}', +# ] + + +# def test_request_multipart_with_multiple_files_and_type(app): +# @app.route("/", methods=["POST"]) +# async def post(request): +# return text("OK") + +# payload = ( +# '------sanic\r\nContent-Disposition: form-data; name="file"; filename="test.json"' +# "\r\nContent-Type: application/json\r\n\r\n\r\n" +# '------sanic\r\nContent-Disposition: form-data; name="file"; filename="some_file.pdf"\r\n' +# "Content-Type: application/pdf\r\n\r\n\r\n------sanic--" +# ) +# headers = {"content-type": "multipart/form-data; boundary=------sanic"} + +# request, _ = app.asgi_client.post(data=payload, headers=headers) +# assert len(request.files.getlist("file")) == 2 +# assert request.files.getlist("file")[0].type == "application/json" +# assert request.files.getlist("file")[1].type == "application/pdf" + + +# def test_request_repr(app): +# @app.get("/") +# def handler(request): +# return text("pass") + +# request, response = app.asgi_client.get("/") +# assert repr(request) == "" + +# request.method = None +# assert repr(request) == "" + + +# def test_request_bool(app): +# @app.get("/") +# def handler(request): +# return text("pass") + +# request, response = app.asgi_client.get("/") +# assert bool(request) + +# request.transport = False +# assert not bool(request) + + +# def test_request_parsing_form_failed(app, caplog): +# @app.route("/", methods=["POST"]) +# async def handler(request): +# return text("OK") + +# payload = "test=OK" +# headers = {"content-type": "multipart/form-data"} + +# request, response = app.asgi_client.post( +# "/", data=payload, headers=headers +# ) + +# with caplog.at_level(logging.ERROR): +# request.form + +# assert caplog.record_tuples[-1] == ( +# "sanic.error", +# logging.ERROR, +# "Failed when parsing form", +# ) + + +# def test_request_args_no_query_string(app): +# @app.get("/") +# def handler(request): +# return text("pass") + +# request, response = app.asgi_client.get("/") + +# assert request.args == {} + + +# def test_request_raw_args(app): + +# params = {"test": "OK"} + +# @app.get("/") +# def handler(request): +# return text("pass") + +# request, response = app.asgi_client.get("/", params=params) + +# assert request.raw_args == params + + +# def test_request_query_args(app): +# # test multiple params with the same key +# params = [("test", "value1"), ("test", "value2")] + +# @app.get("/") +# def handler(request): +# return text("pass") + +# request, response = app.asgi_client.get("/", params=params) + +# assert request.query_args == params + +# # test cached value +# assert ( +# request.parsed_not_grouped_args[(False, False, "utf-8", "replace")] +# == request.query_args +# ) + +# # test params directly in the url +# request, response = app.asgi_client.get("/?test=value1&test=value2") + +# assert request.query_args == params + +# # test unique params +# params = [("test1", "value1"), ("test2", "value2")] + +# request, response = app.asgi_client.get("/", params=params) + +# assert request.query_args == params + +# # test no params +# request, response = app.asgi_client.get("/") + +# assert not request.query_args + + +# def test_request_query_args_custom_parsing(app): +# @app.get("/") +# def handler(request): +# return text("pass") + +# request, response = app.asgi_client.get( +# "/?test1=value1&test2=&test3=value3" +# ) + +# assert request.get_query_args(keep_blank_values=True) == [ +# ("test1", "value1"), +# ("test2", ""), +# ("test3", "value3"), +# ] +# assert request.query_args == [("test1", "value1"), ("test3", "value3")] +# assert request.get_query_args(keep_blank_values=False) == [ +# ("test1", "value1"), +# ("test3", "value3"), +# ] + +# assert request.get_args(keep_blank_values=True) == RequestParameters( +# {"test1": ["value1"], "test2": [""], "test3": ["value3"]} +# ) + +# assert request.args == RequestParameters( +# {"test1": ["value1"], "test3": ["value3"]} +# ) + +# assert request.get_args(keep_blank_values=False) == RequestParameters( +# {"test1": ["value1"], "test3": ["value3"]} +# ) + + +# def test_request_cookies(app): + +# cookies = {"test": "OK"} + +# @app.get("/") +# def handler(request): +# return text("OK") + +# request, response = app.asgi_client.get("/", cookies=cookies) + +# assert request.cookies == cookies +# assert request.cookies == cookies # For request._cookies + + +# def test_request_cookies_without_cookies(app): +# @app.get("/") +# def handler(request): +# return text("OK") + +# request, response = app.asgi_client.get("/") + +# assert request.cookies == {} + + +# def test_request_port(app): +# @app.get("/") +# def handler(request): +# return text("OK") + +# request, response = app.asgi_client.get("/") + +# port = request.port +# assert isinstance(port, int) + +# delattr(request, "_socket") +# delattr(request, "_port") + +# port = request.port +# assert isinstance(port, int) +# assert hasattr(request, "_socket") +# assert hasattr(request, "_port") + + +# def test_request_socket(app): +# @app.get("/") +# def handler(request): +# return text("OK") + +# request, response = app.asgi_client.get("/") + +# socket = request.socket +# assert isinstance(socket, tuple) + +# ip = socket[0] +# port = socket[1] + +# assert ip == request.ip +# assert port == request.port + +# delattr(request, "_socket") + +# socket = request.socket +# assert isinstance(socket, tuple) +# assert hasattr(request, "_socket") + + +# def test_request_form_invalid_content_type(app): +# @app.route("/", methods=["POST"]) +# async def post(request): +# return text("OK") + +# request, response = app.asgi_client.post("/", json={"test": "OK"}) + +# assert request.form == {} + + +# def test_endpoint_basic(): +# app = Sanic() + +# @app.route("/") +# def my_unique_handler(request): +# return text("Hello") + +# request, response = app.asgi_client.get("/") + +# assert request.endpoint == "test_requests.my_unique_handler" + + +# def test_endpoint_named_app(): +# app = Sanic("named") + +# @app.route("/") +# def my_unique_handler(request): +# return text("Hello") + +# request, response = app.asgi_client.get("/") + +# assert request.endpoint == "named.my_unique_handler" + + +# def test_endpoint_blueprint(): +# bp = Blueprint("my_blueprint", url_prefix="/bp") + +# @bp.route("/") +# async def bp_root(request): +# return text("Hello") + +# app = Sanic("named") +# app.blueprint(bp) + +# request, response = app.asgi_client.get("/bp") + +# assert request.endpoint == "named.my_blueprint.bp_root"