This commit is contained in:
Adam Hopkins 2021-01-19 15:54:20 +02:00
parent 976a4c764d
commit a0066e5752
17 changed files with 273 additions and 546 deletions

View File

@ -34,7 +34,6 @@ from sanic.server import (
serve_multiple,
)
from sanic.static import register as static_register
from sanic.testing import SanicASGITestClient, SanicTestClient
from sanic.views import CompositionView
from sanic.websocket import ConnectionClosed, WebSocketProtocol
@ -87,6 +86,7 @@ class Sanic:
self.websocket_tasks: Set[Future] = set()
self.named_request_middleware: Dict[str, MiddlewareType] = {}
self.named_response_middleware: Dict[str, MiddlewareType] = {}
self._test_manager = None
# Register alternative method names
self.go_fast = self.run
@ -1032,11 +1032,21 @@ class Sanic:
@property
def test_client(self):
return SanicTestClient(self)
if self._test_manager:
return self._test_manager.test_client
from sanic_testing import TestManager
manager = TestManager(self)
return manager.test_client
@property
def asgi_client(self):
return SanicASGITestClient(self)
if self._test_manager:
return self._test_manager.asgi_client
from sanic_testing import TestManager
manager = TestManager(self)
return manager.asgi_client
# -------------------------------------------------------------------- #
# Execution

View File

@ -1,284 +0,0 @@
from json import JSONDecodeError
from socket import socket
import httpx
import websockets
from sanic.asgi import ASGIApp
from sanic.exceptions import MethodNotSupported
from sanic.log import logger
from sanic.response import text
ASGI_HOST = "mockserver"
ASGI_PORT = 1234
ASGI_BASE_URL = f"http://{ASGI_HOST}:{ASGI_PORT}"
HOST = "127.0.0.1"
PORT = None
class SanicTestClient:
def __init__(self, app, port=PORT, host=HOST):
"""Use port=None to bind to a random port"""
self.app = app
self.port = port
self.host = host
@app.listener("after_server_start")
def _start_test_mode(sanic, *args, **kwargs):
sanic.test_mode = True
@app.listener("before_server_end")
def _end_test_mode(sanic, *args, **kwargs):
sanic.test_mode = False
def get_new_session(self):
return httpx.AsyncClient(verify=False)
async def _local_request(self, method, url, *args, **kwargs):
logger.info(url)
raw_cookies = kwargs.pop("raw_cookies", None)
if method == "websocket":
async with websockets.connect(url, *args, **kwargs) as websocket:
websocket.opened = websocket.open
return websocket
else:
async with self.get_new_session() as session:
try:
if method == "request":
args = [url] + list(args)
url = kwargs.pop("http_method", "GET").upper()
response = await getattr(session, method.lower())(
url, *args, **kwargs
)
except httpx.HTTPError as e:
if hasattr(e, "response"):
response = e.response
else:
logger.error(
f"{method.upper()} {url} received no response!",
exc_info=True,
)
return None
response.body = await response.aread()
response.status = response.status_code
response.content_type = response.headers.get("content-type")
# response can be decoded as json after response._content
# is set by response.aread()
try:
response.json = response.json()
except (JSONDecodeError, UnicodeDecodeError):
response.json = None
if raw_cookies:
response.raw_cookies = {}
for cookie in response.cookies.jar:
response.raw_cookies[cookie.name] = cookie
return response
def _sanic_endpoint_test(
self,
method="get",
uri="/",
gather_request=True,
debug=False,
server_kwargs={"auto_reload": False},
host=None,
*request_args,
**request_kwargs,
):
results = [None, None]
exceptions = []
if gather_request:
def _collect_request(request):
if results[0] is None:
results[0] = request
self.app.request_middleware.appendleft(_collect_request)
@self.app.exception(MethodNotSupported)
async def error_handler(request, exception):
if request.method in ["HEAD", "PATCH", "PUT", "DELETE"]:
return text(
"", exception.status_code, headers=exception.headers
)
else:
return self.app.error_handler.default(request, exception)
if self.port:
server_kwargs = dict(
host=host or self.host,
port=self.port,
**server_kwargs,
)
host, port = host or self.host, self.port
else:
sock = socket()
sock.bind((host or self.host, 0))
server_kwargs = dict(sock=sock, **server_kwargs)
host, port = sock.getsockname()
self.port = port
if uri.startswith(
("http:", "https:", "ftp:", "ftps://", "//", "ws:", "wss:")
):
url = uri
else:
uri = uri if uri.startswith("/") else f"/{uri}"
scheme = "ws" if method == "websocket" else "http"
url = f"{scheme}://{host}:{port}{uri}"
# Tests construct URLs using PORT = None, which means random port not
# known until this function is called, so fix that here
url = url.replace(":None/", f":{port}/")
@self.app.listener("after_server_start")
async def _collect_response(sanic, loop):
try:
response = await self._local_request(
method, url, *request_args, **request_kwargs
)
results[-1] = response
except Exception as e:
logger.exception("Exception")
exceptions.append(e)
self.app.stop()
self.app.run(debug=debug, **server_kwargs)
self.app.listeners["after_server_start"].pop()
if exceptions:
raise ValueError(f"Exception during request: {exceptions}")
if gather_request:
try:
request, response = results
return request, response
except BaseException: # noqa
raise ValueError(
f"Request and response object expected, got ({results})"
)
else:
try:
return results[-1]
except BaseException: # noqa
raise ValueError(f"Request object expected, got ({results})")
def request(self, *args, **kwargs):
return self._sanic_endpoint_test("request", *args, **kwargs)
def get(self, *args, **kwargs):
return self._sanic_endpoint_test("get", *args, **kwargs)
def post(self, *args, **kwargs):
return self._sanic_endpoint_test("post", *args, **kwargs)
def put(self, *args, **kwargs):
return self._sanic_endpoint_test("put", *args, **kwargs)
def delete(self, *args, **kwargs):
return self._sanic_endpoint_test("delete", *args, **kwargs)
def patch(self, *args, **kwargs):
return self._sanic_endpoint_test("patch", *args, **kwargs)
def options(self, *args, **kwargs):
return self._sanic_endpoint_test("options", *args, **kwargs)
def head(self, *args, **kwargs):
return self._sanic_endpoint_test("head", *args, **kwargs)
def websocket(self, *args, **kwargs):
return self._sanic_endpoint_test("websocket", *args, **kwargs)
class TestASGIApp(ASGIApp):
async def __call__(self):
await super().__call__()
return self.request
async def app_call_with_return(self, scope, receive, send):
asgi_app = await TestASGIApp.create(self, scope, receive, send)
return await asgi_app()
class SanicASGITestClient(httpx.AsyncClient):
def __init__(
self,
app,
base_url: str = ASGI_BASE_URL,
suppress_exceptions: bool = False,
) -> None:
app.__class__.__call__ = app_call_with_return
app.asgi = True
self.app = app
transport = httpx.ASGITransport(app=app, client=(ASGI_HOST, ASGI_PORT))
super().__init__(transport=transport, base_url=base_url)
self.last_request = None
def _collect_request(request):
self.last_request = request
@app.listener("after_server_start")
def _start_test_mode(sanic, *args, **kwargs):
sanic.test_mode = True
@app.listener("before_server_end")
def _end_test_mode(sanic, *args, **kwargs):
sanic.test_mode = False
app.request_middleware.appendleft(_collect_request)
async def request(self, method, url, gather_request=True, *args, **kwargs):
self.gather_request = gather_request
response = await super().request(method, url, *args, **kwargs)
response.status = response.status_code
response.body = response.content
response.content_type = response.headers.get("content-type")
return self.last_request, response
async def websocket(self, uri, subprotocols=None, *args, **kwargs):
scheme = "ws"
path = uri
root_path = f"{scheme}://{ASGI_HOST}"
headers = kwargs.get("headers", {})
headers.setdefault("connection", "upgrade")
headers.setdefault("sec-websocket-key", "testserver==")
headers.setdefault("sec-websocket-version", "13")
if subprotocols is not None:
headers.setdefault(
"sec-websocket-protocol", ", ".join(subprotocols)
)
scope = {
"type": "websocket",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"headers": [map(lambda y: y.encode(), x) for x in headers.items()],
"scheme": scheme,
"root_path": root_path,
"path": path,
"query_string": b"",
}
async def receive():
return {}
async def send(message):
pass
await self.app(scope, receive, send)
return None, {}

View File

@ -89,15 +89,14 @@ requirements = [
"aiofiles>=0.6.0",
"websockets>=8.1,<9.0",
"multidict>=5.0,<6.0",
"httpx==0.15.4",
]
tests_require = [
"sanic-testing",
"pytest==5.2.1",
"multidict>=5.0,<6.0",
"gunicorn==20.0.4",
"pytest-cov",
"httpcore==0.11.*",
"beautifulsoup4",
uvloop,
ujson,

View File

@ -1,5 +0,0 @@
from sanic.testing import SanicASGITestClient
def test_asgi_client_instantiation(app):
assert isinstance(app.asgi_client, SanicASGITestClient)

View File

@ -1,282 +1,282 @@
import asyncio
# import asyncio
from asyncio import sleep as aio_sleep
from json import JSONDecodeError
from os import environ
# from asyncio import sleep as aio_sleep
# from json import JSONDecodeError
# from os import environ
import httpcore
import httpx
import pytest
# import httpcore
# import httpx
# import pytest
from sanic import Sanic, server
from sanic.compat import OS_IS_WINDOWS
from sanic.response import text
from sanic.testing import HOST, SanicTestClient
# from sanic import Sanic, server
# from sanic.compat import OS_IS_WINDOWS
# from sanic.response import text
# from sanic.testing import HOST, SanicTestClient
CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
# CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
PORT = 42101 # test_keep_alive_timeout_reuse doesn't work with random port
# PORT = 42101 # test_keep_alive_timeout_reuse doesn't work with random port
from httpcore._async.base import ConnectionState
from httpcore._async.connection import AsyncHTTPConnection
from httpcore._types import Origin
# from httpcore._async.base import ConnectionState
# from httpcore._async.connection import AsyncHTTPConnection
# from httpcore._types import Origin
class ReusableSanicConnectionPool(httpcore.AsyncConnectionPool):
last_reused_connection = None
# class ReusableSanicConnectionPool(httpcore.AsyncConnectionPool):
# last_reused_connection = None
async def _get_connection_from_pool(self, *args, **kwargs):
conn = await super()._get_connection_from_pool(*args, **kwargs)
self.__class__.last_reused_connection = conn
return conn
# async def _get_connection_from_pool(self, *args, **kwargs):
# conn = await super()._get_connection_from_pool(*args, **kwargs)
# self.__class__.last_reused_connection = conn
# return conn
class ResusableSanicSession(httpx.AsyncClient):
def __init__(self, *args, **kwargs) -> None:
transport = ReusableSanicConnectionPool()
super().__init__(transport=transport, *args, **kwargs)
# class ResusableSanicSession(httpx.AsyncClient):
# def __init__(self, *args, **kwargs) -> None:
# transport = ReusableSanicConnectionPool()
# super().__init__(transport=transport, *args, **kwargs)
class ReuseableSanicTestClient(SanicTestClient):
def __init__(self, app, loop=None):
super().__init__(app)
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._server = None
self._tcp_connector = None
self._session = None
# class ReuseableSanicTestClient(SanicTestClient):
# def __init__(self, app, loop=None):
# super().__init__(app)
# if loop is None:
# loop = asyncio.get_event_loop()
# self._loop = loop
# self._server = None
# self._tcp_connector = None
# self._session = None
def get_new_session(self):
return ResusableSanicSession()
# def get_new_session(self):
# return ResusableSanicSession()
# Copied from SanicTestClient, but with some changes to reuse the
# same loop for the same app.
def _sanic_endpoint_test(
self,
method="get",
uri="/",
gather_request=True,
debug=False,
server_kwargs=None,
*request_args,
**request_kwargs,
):
loop = self._loop
results = [None, None]
exceptions = []
server_kwargs = server_kwargs or {"return_asyncio_server": True}
if gather_request:
# # Copied from SanicTestClient, but with some changes to reuse the
# # same loop for the same app.
# def _sanic_endpoint_test(
# self,
# method="get",
# uri="/",
# gather_request=True,
# debug=False,
# server_kwargs=None,
# *request_args,
# **request_kwargs,
# ):
# loop = self._loop
# results = [None, None]
# exceptions = []
# server_kwargs = server_kwargs or {"return_asyncio_server": True}
# if gather_request:
def _collect_request(request):
if results[0] is None:
results[0] = request
# def _collect_request(request):
# if results[0] is None:
# results[0] = request
self.app.request_middleware.appendleft(_collect_request)
# self.app.request_middleware.appendleft(_collect_request)
if uri.startswith(
("http:", "https:", "ftp:", "ftps://", "//", "ws:", "wss:")
):
url = uri
else:
uri = uri if uri.startswith("/") else f"/{uri}"
scheme = "http"
url = f"{scheme}://{HOST}:{PORT}{uri}"
# if uri.startswith(
# ("http:", "https:", "ftp:", "ftps://", "//", "ws:", "wss:")
# ):
# url = uri
# else:
# uri = uri if uri.startswith("/") else f"/{uri}"
# scheme = "http"
# url = f"{scheme}://{HOST}:{PORT}{uri}"
@self.app.listener("after_server_start")
async def _collect_response(loop):
try:
response = await self._local_request(
method, url, *request_args, **request_kwargs
)
results[-1] = response
except Exception as e2:
exceptions.append(e2)
# @self.app.listener("after_server_start")
# async def _collect_response(loop):
# try:
# response = await self._local_request(
# method, url, *request_args, **request_kwargs
# )
# results[-1] = response
# except Exception as e2:
# exceptions.append(e2)
if self._server is not None:
_server = self._server
else:
_server_co = self.app.create_server(
host=HOST, debug=debug, port=PORT, **server_kwargs
)
# if self._server is not None:
# _server = self._server
# else:
# _server_co = self.app.create_server(
# host=HOST, debug=debug, port=PORT, **server_kwargs
# )
server.trigger_events(
self.app.listeners["before_server_start"], loop
)
# server.trigger_events(
# self.app.listeners["before_server_start"], loop
# )
try:
loop._stopping = False
_server = loop.run_until_complete(_server_co)
except Exception as e1:
raise e1
self._server = _server
server.trigger_events(self.app.listeners["after_server_start"], loop)
self.app.listeners["after_server_start"].pop()
# try:
# loop._stopping = False
# _server = loop.run_until_complete(_server_co)
# except Exception as e1:
# raise e1
# self._server = _server
# server.trigger_events(self.app.listeners["after_server_start"], loop)
# self.app.listeners["after_server_start"].pop()
if exceptions:
raise ValueError(f"Exception during request: {exceptions}")
# if exceptions:
# raise ValueError(f"Exception during request: {exceptions}")
if gather_request:
self.app.request_middleware.pop()
try:
request, response = results
return request, response
except Exception:
raise ValueError(
f"Request and response object expected, got ({results})"
)
else:
try:
return results[-1]
except Exception:
raise ValueError(f"Request object expected, got ({results})")
# if gather_request:
# self.app.request_middleware.pop()
# try:
# request, response = results
# return request, response
# except Exception:
# raise ValueError(
# f"Request and response object expected, got ({results})"
# )
# else:
# try:
# return results[-1]
# except Exception:
# raise ValueError(f"Request object expected, got ({results})")
def kill_server(self):
try:
if self._server:
self._server.close()
self._loop.run_until_complete(self._server.wait_closed())
self._server = None
# def kill_server(self):
# try:
# if self._server:
# self._server.close()
# self._loop.run_until_complete(self._server.wait_closed())
# self._server = None
if self._session:
self._loop.run_until_complete(self._session.aclose())
self._session = None
# if self._session:
# self._loop.run_until_complete(self._session.aclose())
# self._session = None
except Exception as e3:
raise e3
# except Exception as e3:
# raise e3
# Copied from SanicTestClient, but with some changes to reuse the
# same TCPConnection and the sane ClientSession more than once.
# Note, you cannot use the same session if you are in a _different_
# loop, so the changes above are required too.
async def _local_request(self, method, url, *args, **kwargs):
raw_cookies = kwargs.pop("raw_cookies", None)
request_keepalive = kwargs.pop(
"request_keepalive", CONFIG_FOR_TESTS["KEEP_ALIVE_TIMEOUT"]
)
if not self._session:
self._session = self.get_new_session()
try:
response = await getattr(self._session, method.lower())(
url, timeout=request_keepalive, *args, **kwargs
)
except NameError:
raise Exception(response.status_code)
# # Copied from SanicTestClient, but with some changes to reuse the
# # same TCPConnection and the sane ClientSession more than once.
# # Note, you cannot use the same session if you are in a _different_
# # loop, so the changes above are required too.
# async def _local_request(self, method, url, *args, **kwargs):
# raw_cookies = kwargs.pop("raw_cookies", None)
# request_keepalive = kwargs.pop(
# "request_keepalive", CONFIG_FOR_TESTS["KEEP_ALIVE_TIMEOUT"]
# )
# if not self._session:
# self._session = self.get_new_session()
# try:
# response = await getattr(self._session, method.lower())(
# url, timeout=request_keepalive, *args, **kwargs
# )
# except NameError:
# raise Exception(response.status_code)
try:
response.json = response.json()
except (JSONDecodeError, UnicodeDecodeError):
response.json = None
# try:
# response.json = response.json()
# except (JSONDecodeError, UnicodeDecodeError):
# response.json = None
response.body = await response.aread()
response.status = response.status_code
response.content_type = response.headers.get("content-type")
# response.body = await response.aread()
# response.status = response.status_code
# response.content_type = response.headers.get("content-type")
if raw_cookies:
response.raw_cookies = {}
for cookie in response.cookies:
response.raw_cookies[cookie.name] = cookie
# if raw_cookies:
# response.raw_cookies = {}
# for cookie in response.cookies:
# response.raw_cookies[cookie.name] = cookie
return response
# return response
keep_alive_timeout_app_reuse = Sanic("test_ka_timeout_reuse")
keep_alive_app_client_timeout = Sanic("test_ka_client_timeout")
keep_alive_app_server_timeout = Sanic("test_ka_server_timeout")
# keep_alive_timeout_app_reuse = Sanic("test_ka_timeout_reuse")
# keep_alive_app_client_timeout = Sanic("test_ka_client_timeout")
# keep_alive_app_server_timeout = Sanic("test_ka_server_timeout")
keep_alive_timeout_app_reuse.config.update(CONFIG_FOR_TESTS)
keep_alive_app_client_timeout.config.update(CONFIG_FOR_TESTS)
keep_alive_app_server_timeout.config.update(CONFIG_FOR_TESTS)
# keep_alive_timeout_app_reuse.config.update(CONFIG_FOR_TESTS)
# keep_alive_app_client_timeout.config.update(CONFIG_FOR_TESTS)
# keep_alive_app_server_timeout.config.update(CONFIG_FOR_TESTS)
@keep_alive_timeout_app_reuse.route("/1")
async def handler1(request):
return text("OK")
# @keep_alive_timeout_app_reuse.route("/1")
# async def handler1(request):
# return text("OK")
@keep_alive_app_client_timeout.route("/1")
async def handler2(request):
return text("OK")
# @keep_alive_app_client_timeout.route("/1")
# async def handler2(request):
# return text("OK")
@keep_alive_app_server_timeout.route("/1")
async def handler3(request):
return text("OK")
# @keep_alive_app_server_timeout.route("/1")
# async def handler3(request):
# return text("OK")
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
reason="Not testable with current client",
)
def test_keep_alive_timeout_reuse():
"""If the server keep-alive timeout and client keep-alive timeout are
both longer than the delay, the client _and_ server will successfully
reuse the existing connection."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop)
headers = {"Connection": "keep-alive"}
request, response = client.get("/1", headers=headers)
assert response.status == 200
assert response.text == "OK"
loop.run_until_complete(aio_sleep(1))
request, response = client.get("/1")
assert response.status == 200
assert response.text == "OK"
assert ReusableSanicConnectionPool.last_reused_connection
finally:
client.kill_server()
# @pytest.mark.skipif(
# bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
# reason="Not testable with current client",
# )
# def test_keep_alive_timeout_reuse():
# """If the server keep-alive timeout and client keep-alive timeout are
# both longer than the delay, the client _and_ server will successfully
# reuse the existing connection."""
# try:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop)
# headers = {"Connection": "keep-alive"}
# request, response = client.get("/1", headers=headers)
# assert response.status == 200
# assert response.text == "OK"
# loop.run_until_complete(aio_sleep(1))
# request, response = client.get("/1")
# assert response.status == 200
# assert response.text == "OK"
# assert ReusableSanicConnectionPool.last_reused_connection
# finally:
# client.kill_server()
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
reason="Not testable with current client",
)
def test_keep_alive_client_timeout():
"""If the server keep-alive timeout is longer than the client
keep-alive timeout, client will try to create a new connection here."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_app_client_timeout, loop)
headers = {"Connection": "keep-alive"}
request, response = client.get(
"/1", headers=headers, request_keepalive=1
)
assert response.status == 200
assert response.text == "OK"
loop.run_until_complete(aio_sleep(2))
exception = None
request, response = client.get("/1", request_keepalive=1)
assert ReusableSanicConnectionPool.last_reused_connection is None
finally:
client.kill_server()
# @pytest.mark.skipif(
# bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
# reason="Not testable with current client",
# )
# def test_keep_alive_client_timeout():
# """If the server keep-alive timeout is longer than the client
# keep-alive timeout, client will try to create a new connection here."""
# try:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# client = ReuseableSanicTestClient(keep_alive_app_client_timeout, loop)
# headers = {"Connection": "keep-alive"}
# request, response = client.get(
# "/1", headers=headers, request_keepalive=1
# )
# assert response.status == 200
# assert response.text == "OK"
# loop.run_until_complete(aio_sleep(2))
# exception = None
# request, response = client.get("/1", request_keepalive=1)
# assert ReusableSanicConnectionPool.last_reused_connection is None
# finally:
# client.kill_server()
@pytest.mark.skipif(
bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
reason="Not testable with current client",
)
def test_keep_alive_server_timeout():
"""If the client keep-alive timeout is longer than the server
keep-alive timeout, the client will either a 'Connection reset' error
_or_ a new connection. Depending on how the event-loop handles the
broken server connection."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = ReuseableSanicTestClient(keep_alive_app_server_timeout, loop)
headers = {"Connection": "keep-alive"}
request, response = client.get(
"/1", headers=headers, request_keepalive=60
)
assert response.status == 200
assert response.text == "OK"
loop.run_until_complete(aio_sleep(3))
exception = None
request, response = client.get("/1", request_keepalive=60)
assert ReusableSanicConnectionPool.last_reused_connection is None
finally:
client.kill_server()
# @pytest.mark.skipif(
# bool(environ.get("SANIC_NO_UVLOOP")) or OS_IS_WINDOWS,
# reason="Not testable with current client",
# )
# def test_keep_alive_server_timeout():
# """If the client keep-alive timeout is longer than the server
# keep-alive timeout, the client will either a 'Connection reset' error
# _or_ a new connection. Depending on how the event-loop handles the
# broken server connection."""
# try:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# client = ReuseableSanicTestClient(keep_alive_app_server_timeout, loop)
# headers = {"Connection": "keep-alive"}
# request, response = client.get(
# "/1", headers=headers, request_keepalive=60
# )
# assert response.status == 200
# assert response.text == "OK"
# loop.run_until_complete(aio_sleep(3))
# exception = None
# request, response = client.get("/1", request_keepalive=60)
# assert ReusableSanicConnectionPool.last_reused_connection is None
# finally:
# client.kill_server()

View File

@ -8,13 +8,14 @@ from io import StringIO
import pytest
from sanic_testing.testing import SanicTestClient
import sanic
from sanic import Sanic
from sanic.compat import OS_IS_WINDOWS
from sanic.log import LOGGING_CONFIG_DEFAULTS, logger
from sanic.response import text
from sanic.testing import SanicTestClient
logging_format = """module: %(module)s; \

View File

@ -1,8 +1,9 @@
import asyncio
import logging
from sanic_testing.testing import PORT
from sanic.config import BASE_LOGO
from sanic.testing import PORT
try:

View File

@ -5,9 +5,10 @@ import signal
import pytest
from sanic_testing.testing import HOST, PORT
from sanic import Blueprint
from sanic.response import text
from sanic.testing import HOST, PORT
@pytest.mark.skipif(

View File

@ -16,10 +16,10 @@ from httpcore._async.connection_pool import ResponseByteStream
from httpcore._exceptions import LocalProtocolError, UnsupportedProtocol
from httpcore._types import TimeoutDict
from httpcore._utils import url_to_origin
from sanic_testing.testing import SanicTestClient
from sanic import Sanic
from sanic.response import text
from sanic.testing import SanicTestClient
class DelayableHTTPConnection(httpcore._async.connection.AsyncHTTPConnection):

View File

@ -8,11 +8,7 @@ from urllib.parse import urlparse
import pytest
from sanic import Blueprint, Sanic
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.response import html, json, text
from sanic.testing import (
from sanic_testing.testing import (
ASGI_BASE_URL,
ASGI_HOST,
ASGI_PORT,
@ -21,6 +17,11 @@ from sanic.testing import (
SanicTestClient,
)
from sanic import Blueprint, Sanic
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.response import html, json, text
# ------------------------------------------------------------ #
# GET

View File

@ -12,6 +12,7 @@ from urllib.parse import unquote
import pytest
from aiofiles import os as async_os
from sanic_testing.testing import HOST, PORT
from sanic.response import (
HTTPResponse,
@ -25,7 +26,6 @@ from sanic.response import (
text,
)
from sanic.server import HttpProtocol
from sanic.testing import HOST, PORT
JSON_DATA = {"ok": True}

View File

@ -2,11 +2,12 @@ import asyncio
import pytest
from sanic_testing.testing import SanicTestClient
from sanic import Sanic
from sanic.constants import HTTP_METHODS
from sanic.response import json, text
from sanic.router import ParameterNameConflicts, RouteDoesNotExist, RouteExists
from sanic.testing import SanicTestClient
# ------------------------------------------------------------ #

View File

@ -6,7 +6,7 @@ from socket import socket
import pytest
from sanic.testing import HOST, PORT
from sanic_testing.testing import HOST, PORT
AVAILABLE_LISTENERS = [

View File

@ -7,9 +7,10 @@ from unittest.mock import MagicMock
import pytest
from sanic_testing.testing import HOST, PORT
from sanic.compat import ctrlc_workaround_for_windows
from sanic.response import HTTPResponse
from sanic.testing import HOST, PORT
async def stop(app, loop):

View File

@ -1,5 +1,6 @@
from sanic_testing.testing import PORT, SanicTestClient
from sanic.response import json, text
from sanic.testing import PORT, SanicTestClient
# ------------------------------------------------------------ #

View File

@ -4,11 +4,12 @@ from urllib.parse import parse_qsl, urlsplit
import pytest as pytest
from sanic_testing.testing import HOST as test_host
from sanic_testing.testing import PORT as test_port
from sanic.blueprints import Blueprint
from sanic.exceptions import URLBuildError
from sanic.response import text
from sanic.testing import HOST as test_host
from sanic.testing import PORT as test_port
from sanic.views import HTTPMethodView

View File

@ -7,14 +7,13 @@ setenv =
{py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UJSON=1
{py36,py37,py38,py39,pyNightly}-no-ext: SANIC_NO_UVLOOP=1
deps =
sanic-testing
coverage==5.3
pytest==5.2.1
pytest-cov
pytest-sanic
pytest-sugar
pytest-benchmark
httpcore==0.11.*
httpx==0.15.4
chardet==3.*
beautifulsoup4
gunicorn==20.0.4