Swap out requests-async for httpx (#1728)
* Begin swap of requests-async for httpx * Finalize httpx adoption and resolve tests Resolve linting and formatting * Remove documentation references to requests-async in favor of httpx
This commit is contained in:
@ -2,7 +2,7 @@ Testing
Sanic endpoints can be tested locally using the `test_client` object, which
depends on the additional `requests-async <https://github.com/encode/requests-async>`_
depends on an additional package: `httpx <https://www.encode.io/httpx/>`_
library, which implements an API that mirrors the `requests` library.
The `test_client` exposes `get`, `post`, `put`, `delete`, `patch`, `head` and `options` methods
@ -22,7 +22,7 @@ for you to run against your application. A simple example (using pytest) is like
assert response.status == 405
Internally, each time you call one of the `test_client` methods, the Sanic app is run at `` and
your test request is executed against your application, using `requests-async`.
your test request is executed against your application, using `httpx`.
The `test_client` methods accept the following arguments and keyword arguments:
@ -55,8 +55,8 @@ And to supply data to a JSON POST request:
assert request.json.get('key1') == 'value1'
More information about
the available arguments to `requests-async` can be found
[in the documentation for `requests <https://2.python-requests.org/en/master/>`_.
the available arguments to `httpx` can be found
[in the documentation for `httpx <https://www.encode.io/httpx/>`_.
Using a random port
@ -13,7 +13,7 @@ dependencies:
- sphinx==1.8.3
- sphinx_rtd_theme==0.4.2
- recommonmark==0.5.0
- requests-async==0.5.0
- httpx==0.9.3
- sphinxcontrib-asyncio>=0.2.0
- docutils==0.14
- pygments==2.3.1
@ -15,8 +15,6 @@ from typing import (
from urllib.parse import quote
from requests_async import ASGISession # type: ignore
import sanic.app # noqa
from sanic.compat import Header
@ -189,7 +187,7 @@ class Lifespan:
class ASGIApp:
sanic_app: Union[ASGISession, "sanic.app.Sanic"]
sanic_app: "sanic.app.Sanic"
request: Request
transport: MockTransport
do_stream: bool
@ -223,8 +221,13 @@ class ASGIApp:
if scope["type"] == "lifespan":
await instance.lifespan(scope, receive, send)
url_bytes = scope.get("root_path", "") + quote(scope["path"])
url_bytes = url_bytes.encode("latin-1")
path = (
if scope["path"].startswith("/")
else scope["path"]
url = "/".join([scope.get("root_path", ""), quote(path)])
url_bytes = url.encode("latin-1")
url_bytes += b"?" + scope["query_string"]
if scope["type"] == "http":
@ -1,14 +1,8 @@
import asyncio
import types
import typing
from json import JSONDecodeError
from socket import socket
from urllib.parse import unquote, urlsplit
import httpcore # type: ignore
import requests_async as requests # type: ignore
import websockets # type: ignore
import httpx
import websockets
from sanic.asgi import ASGIApp
from sanic.exceptions import MethodNotSupported
@ -29,7 +23,7 @@ class SanicTestClient:
self.host = host
def get_new_session(self):
return requests.Session()
return httpx.Client()
async def _local_request(self, method, url, *args, **kwargs):
@ -60,7 +54,8 @@ class SanicTestClient:
if raw_cookies:
response.raw_cookies = {}
for cookie in response.cookies:
for cookie in response.cookies.jar:
response.raw_cookies[cookie.name] = cookie
return response
@ -179,181 +174,6 @@ class SanicTestClient:
return self._sanic_endpoint_test("websocket", *args, **kwargs)
class SanicASGIAdapter(requests.asgi.ASGIAdapter): # noqa
async def send( # type: ignore
request: requests.PreparedRequest,
gather_return: bool = False,
*args: typing.Any,
**kwargs: typing.Any,
) -> requests.Response:
"""This method is taken MOSTLY verbatim from requests-asyn. The
difference is the capturing of a response on the ASGI call and then
returning it on the response object. This is implemented to achieve:
request, response = await app.asgi_client.get("/")
You can see the original code here:
https://github.com/encode/requests-async/blob/614f40f77f19e6c6da8a212ae799107b0384dbf9/requests_async/asgi.py#L51""" # noqa
scheme, netloc, path, query, fragment = urlsplit(
) # type: ignore
default_port = {"http": 80, "ws": 80, "https": 443, "wss": 443}[scheme]
if ":" in netloc:
host, port_string = netloc.split(":", 1)
port = int(port_string)
host = netloc
port = default_port
# Include the 'host' header.
if "host" in request.headers:
headers = [] # type: typing.List[typing.Tuple[bytes, bytes]]
elif port == default_port:
headers = [(b"host", host.encode())]
headers = [(b"host", (f"{host}:{port}").encode())]
# Include other request headers.
headers += [
(key.lower().encode(), value.encode())
for key, value in request.headers.items()
no_response = False
if scheme in {"ws", "wss"}:
subprotocol = request.headers.get("sec-websocket-protocol", None)
if subprotocol is None:
subprotocols = [] # type: typing.Sequence[str]
subprotocols = [
value.strip() for value in subprotocol.split(",")
scope = {
"type": "websocket",
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"subprotocols": subprotocols,
no_response = True
scope = {
"type": "http",
"http_version": "1.1",
"method": request.method,
"path": unquote(path),
"root_path": "",
"scheme": scheme,
"query_string": query.encode(),
"headers": headers,
"client": ["testclient", 50000],
"server": [host, port],
"extensions": {"http.response.template": {}},
async def receive():
nonlocal request_complete, response_complete
if request_complete:
while not response_complete:
await asyncio.sleep(0.0001)
return {"type": "http.disconnect"}
body = request.body
if isinstance(body, str):
body_bytes = body.encode("utf-8") # type: bytes
elif body is None:
body_bytes = b""
elif isinstance(body, types.GeneratorType):
chunk = body.send(None)
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
return {
"type": "http.request",
"body": chunk,
"more_body": True,
except StopIteration:
request_complete = True
return {"type": "http.request", "body": b""}
body_bytes = body
request_complete = True
return {"type": "http.request", "body": body_bytes}
request_complete = False
response_started = False
response_complete = False
raw_kwargs = {"content": b""} # type: typing.Dict[str, typing.Any]
template = None
context = None
return_value = None
async def send(message) -> None:
nonlocal raw_kwargs, response_started, response_complete, template, context # noqa
if message["type"] == "http.response.start":
assert (
not response_started
), 'Received multiple "http.response.start" messages.'
raw_kwargs["status_code"] = message["status"]
raw_kwargs["headers"] = message["headers"]
response_started = True
elif message["type"] == "http.response.body":
assert response_started, (
'Received "http.response.body" '
'without "http.response.start".'
assert (
not response_complete
), 'Received "http.response.body" after response completed.'
body = message.get("body", b"")
more_body = message.get("more_body", False)
if request.method != "HEAD":
raw_kwargs["content"] += body
if not more_body:
response_complete = True
elif message["type"] == "http.response.template":
template = message["template"]
context = message["context"]
return_value = await self.app(scope, receive, send)
except BaseException as exc:
if not self.suppress_exceptions:
raise exc from None
if no_response:
response_started = True
raw_kwargs = {"status_code": 204, "headers": []}
if not self.suppress_exceptions:
assert response_started, "TestClient did not receive any response."
elif not response_started:
raw_kwargs = {"status_code": 500, "headers": []}
raw = httpcore.Response(**raw_kwargs)
response = self.build_response(request, raw)
if template is not None:
response.template = template
response.context = context
if gather_return:
response.return_value = return_value
return response
class TestASGIApp(ASGIApp):
async def __call__(self):
await super().__call__()
@ -365,7 +185,11 @@ async def app_call_with_return(self, scope, receive, send):
return await asgi_app()
class SanicASGITestClient(requests.ASGISession):
class SanicASGIDispatch(httpx.dispatch.ASGIDispatch):
class SanicASGITestClient(httpx.Client):
def __init__(
@ -374,18 +198,18 @@ class SanicASGITestClient(requests.ASGISession):
) -> None:
app.__class__.__call__ = app_call_with_return
app.asgi = True
adapter = SanicASGIAdapter(
app, suppress_exceptions=suppress_exceptions
self.mount("http://", adapter)
self.mount("https://", adapter)
self.mount("ws://", adapter)
self.mount("wss://", adapter)
self.headers.update({"user-agent": "testclient"})
self.app = app
self.base_url = base_url
dispatch = SanicASGIDispatch(app=app, client=(ASGI_HOST, PORT))
super().__init__(dispatch=dispatch, base_url=base_url)
self.last_request = None
def _collect_request(request):
self.last_request = request
async def request(self, method, url, gather_request=True, *args, **kwargs):
@ -395,33 +219,39 @@ class SanicASGITestClient(requests.ASGISession):
response.body = response.content
response.content_type = response.headers.get("content-type")
if hasattr(response, "return_value"):
request = response.return_value
del response.return_value
return request, response
return response
def merge_environment_settings(self, *args, **kwargs):
settings = super().merge_environment_settings(*args, **kwargs)
settings.update({"gather_return": self.gather_request})
return settings
return self.last_request, response
async def websocket(self, uri, subprotocols=None, *args, **kwargs):
if uri.startswith(("ws:", "wss:")):
url = uri
uri = uri if uri.startswith("/") else "/{uri}".format(uri=uri)
url = "ws://testserver{uri}".format(uri=uri)
scheme = "ws"
path = uri
root_path = "{}://{}".format(scheme, ASGI_HOST)
headers = kwargs.get("headers", {})
headers.setdefault("connection", "upgrade")
headers.setdefault("sec-websocket-key", "testserver==")
headers.setdefault("sec-websocket-version", "13")
if subprotocols is not None:
"sec-websocket-protocol", ", ".join(subprotocols)
kwargs["headers"] = headers
headers = kwargs.get("headers", {})
headers.setdefault("connection", "upgrade")
headers.setdefault("sec-websocket-key", "testserver==")
headers.setdefault("sec-websocket-version", "13")
if subprotocols is not None:
"sec-websocket-protocol", ", ".join(subprotocols)
return await self.request("websocket", url, **kwargs)
scope = {
"type": "websocket",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"headers": [map(lambda y: y.encode(), x) for x in headers.items()],
"scheme": scheme,
"root_path": root_path,
"path": path,
"query_string": b"",
async def receive():
return {}
async def send(message):
await self.app(scope, receive, send)
return None, {}
@ -5,6 +5,7 @@ import codecs
import os
import re
import sys
from distutils.util import strtobool
from setuptools import setup
@ -83,7 +84,7 @@ requirements = [
tests_require = [
@ -1,6 +1,6 @@
import asyncio
from collections import deque
from collections import deque, namedtuple
import pytest
import uvicorn
@ -245,17 +245,26 @@ async def test_cookie_customization(app):
return response
_, response = await app.asgi_client.get("/cookie")
CookieDef = namedtuple("CookieDef", ("value", "httponly"))
Cookie = namedtuple("Cookie", ("domain", "path", "value", "httponly"))
cookie_map = {
"test": {"value": "Cookie1", "HttpOnly": True},
"c2": {"value": "Cookie2", "HttpOnly": False},
"test": CookieDef("Cookie1", True),
"c2": CookieDef("Cookie2", False),
for k, v in (
assert cookie_map.get(k).get("value") == v.value
if cookie_map.get(k).get("HttpOnly"):
assert "HttpOnly" in v._rest.keys()
cookies = {
c.name: Cookie(c.domain, c.path, c.value, "HttpOnly" in c._rest.keys())
for c in response.cookies.jar
for name, definition in cookie_map.items():
cookie = cookies.get(name)
assert cookie
assert cookie.value == definition.value
assert cookie.domain == "mockserver.local"
assert cookie.path == "/"
assert cookie.httponly == definition.httponly
@ -1,15 +1,9 @@
import asyncio
import functools
import socket
from asyncio import sleep as aio_sleep
from http.client import _encode
from json import JSONDecodeError
import httpcore
import requests_async as requests
from httpcore import PoolTimeout
import httpx
from sanic import Sanic, server
from sanic.response import text
@ -21,24 +15,28 @@ CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True}
old_conn = None
class ReusableSanicConnectionPool(httpcore.ConnectionPool):
async def acquire_connection(self, origin):
class ReusableSanicConnectionPool(
async def acquire_connection(self, origin, timeout):
global old_conn
connection = self.active_connections.pop_by_origin(
origin, http2_only=True
if connection is None:
connection = self.keepalive_connections.pop_by_origin(origin)
connection = self.pop_connection(origin)
if connection is None:
await self.max_connections.acquire()
connection = httpcore.HTTPConnection(
pool_timeout = None if timeout is None else timeout.pool_timeout
await self.max_connections.acquire(timeout=pool_timeout)
connection = httpx.dispatch.connection.HTTPConnection(
if old_conn is not None:
@ -51,17 +49,10 @@ class ReusableSanicConnectionPool(httpcore.ConnectionPool):
return connection
class ReusableSanicAdapter(requests.adapters.HTTPAdapter):
def __init__(self):
self.pool = ReusableSanicConnectionPool()
class ResusableSanicSession(requests.Session):
class ResusableSanicSession(httpx.Client):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
adapter = ReusableSanicAdapter()
self.mount("http://", adapter)
self.mount("https://", adapter)
dispatch = ReusableSanicConnectionPool()
super().__init__(dispatch=dispatch, *args, **kwargs)
class ReuseableSanicTestClient(SanicTestClient):
@ -74,6 +65,9 @@ class ReuseableSanicTestClient(SanicTestClient):
self._tcp_connector = None
self._session = None
def get_new_session(self):
return ResusableSanicSession()
# Copied from SanicTestClient, but with some changes to reuse the
# same loop for the same app.
def _sanic_endpoint_test(
@ -167,7 +161,6 @@ class ReuseableSanicTestClient(SanicTestClient):
self._server = None
if self._session:
@ -186,7 +179,7 @@ class ReuseableSanicTestClient(SanicTestClient):
if not self._session:
self._session = ResusableSanicSession()
self._session = self.get_new_session()
response = await getattr(self._session, method.lower())(
url, verify=False, timeout=request_keepalive, *args, **kwargs
@ -2,6 +2,7 @@ import io
from sanic.response import text
data = "abc" * 10_000_000
@ -332,7 +332,7 @@ def test_request_stream_handle_exception(app):
assert response.text == "Error: Requested URL /in_valid_post not found"
# 405
request, response = app.test_client.get("/post/random_id", data=data)
request, response = app.test_client.get("/post/random_id")
assert response.status == 405
assert (
response.text == "Error: Method GET not allowed for URL"
@ -1,49 +1,70 @@
import asyncio
import httpcore
import requests_async as requests
import httpx
from sanic import Sanic
from sanic.response import text
from sanic.testing import SanicTestClient
class DelayableSanicConnectionPool(httpcore.ConnectionPool):
class DelayableHTTPConnection(httpx.dispatch.connection.HTTPConnection):
def __init__(self, *args, **kwargs):
self._request_delay = None
if "request_delay" in kwargs:
self._request_delay = kwargs.pop("request_delay")
super().__init__(*args, **kwargs)
async def send(self, request, verify=None, cert=None, timeout=None):
if self.h11_connection is None and self.h2_connection is None:
await self.connect(verify=verify, cert=cert, timeout=timeout)
if self._request_delay:
await asyncio.sleep(self._request_delay)
if self.h2_connection is not None:
response = await self.h2_connection.send(request, timeout=timeout)
assert self.h11_connection is not None
response = await self.h11_connection.send(request, timeout=timeout)
return response
class DelayableSanicConnectionPool(
def __init__(self, request_delay=None, *args, **kwargs):
self._request_delay = request_delay
super().__init__(*args, **kwargs)
async def send(self, request, stream=False, ssl=None, timeout=None):
connection = await self.acquire_connection(request.url.origin)
if (
connection.h11_connection is None
and connection.h2_connection is None
await connection.connect(ssl=ssl, timeout=timeout)
if self._request_delay:
await asyncio.sleep(self._request_delay)
response = await connection.send(
request, stream=stream, ssl=ssl, timeout=timeout
async def acquire_connection(self, origin, timeout=None):
connection = self.pop_connection(origin)
if connection is None:
pool_timeout = None if timeout is None else timeout.pool_timeout
await self.max_connections.acquire(timeout=pool_timeout)
connection = DelayableHTTPConnection(
except BaseException as exc:
raise exc
return response
return connection
class DelayableSanicAdapter(requests.adapters.HTTPAdapter):
def __init__(self, request_delay=None):
self.pool = DelayableSanicConnectionPool(request_delay=request_delay)
class DelayableSanicSession(requests.Session):
class DelayableSanicSession(httpx.Client):
def __init__(self, request_delay=None, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
adapter = DelayableSanicAdapter(request_delay=request_delay)
self.mount("http://", adapter)
self.mount("https://", adapter)
dispatch = DelayableSanicConnectionPool(request_delay=request_delay)
super().__init__(dispatch=dispatch, *args, **kwargs)
class DelayableSanicTestClient(SanicTestClient):
@ -10,7 +10,7 @@ import pytest
from sanic import Blueprint, Sanic
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.response import json, text
from sanic.testing import ASGI_HOST, HOST, PORT
@ -55,11 +55,11 @@ def test_ip(app):
async def test_ip_asgi(app):
def handler(request):
return text("{}".format(request.ip))
return text("{}".format(request.url))
request, response = await app.asgi_client.get("/")
assert response.text == "mockserver"
assert response.text == "http://mockserver/"
def test_text(app):
@ -207,24 +207,24 @@ async def test_empty_json_asgi(app):
def test_invalid_json(app):
async def handler(request):
return json(request.json)
data = "I am not json"
request, response = app.test_client.get("/", data=data)
request, response = app.test_client.post("/", data=data)
assert response.status == 400
async def test_invalid_json_asgi(app):
async def handler(request):
return json(request.json)
data = "I am not json"
request, response = await app.asgi_client.get("/", data=data)
request, response = await app.asgi_client.post("/", data=data)
assert response.status == 400
@ -1807,26 +1807,6 @@ def test_request_port(app):
assert hasattr(request, "_port")
async def test_request_port_asgi(app):
def handler(request):
return text("OK")
request, response = await app.asgi_client.get("/")
port = request.port
assert isinstance(port, int)
delattr(request, "_socket")
delattr(request, "_port")
port = request.port
assert isinstance(port, int)
assert hasattr(request, "_socket")
assert hasattr(request, "_port")
def test_request_socket(app):
def handler(request):
@ -37,14 +37,14 @@ def skip_test_utf8_route(app):
def test_utf8_post_json(app):
async def handler(request):
return text("OK")
payload = {"test": "✓"}
headers = {"content-type": "application/json"}
request, response = app.test_client.get(
request, response = app.test_client.post(
"/", data=json_dumps(payload), headers=headers
Reference in New Issue
Block a user