diff --git a/environment.yml b/environment.yml index e9b832b4..e42a97ef 100644 --- a/environment.yml +++ b/environment.yml @@ -13,7 +13,7 @@ dependencies: - sphinx==1.8.3 - sphinx_rtd_theme==0.4.2 - recommonmark==0.5.0 - - requests-async==0.4.0 + - requests-async==0.5.0 - sphinxcontrib-asyncio>=0.2.0 - docutils==0.14 - pygments==2.3.1 diff --git a/setup.py b/setup.py index 2cd973b7..861ce013 100644 --- a/setup.py +++ b/setup.py @@ -89,8 +89,8 @@ tests_require = [ "multidict>=4.0,<5.0", "gunicorn", "pytest-cov", - "httpcore==0.1.1", - "requests-async==0.4.0", + "httpcore==0.3.0", + "requests-async==0.5.0", "beautifulsoup4", uvloop, ujson, diff --git a/tests/test_keep_alive_timeout.py b/tests/test_keep_alive_timeout.py index 1d6de63e..c6fc0831 100644 --- a/tests/test_keep_alive_timeout.py +++ b/tests/test_keep_alive_timeout.py @@ -16,45 +16,28 @@ from sanic.response import text from sanic.testing import HOST, PORT, SanicTestClient -# import traceback - - - - - - CONFIG_FOR_TESTS = {"KEEP_ALIVE_TIMEOUT": 2, "KEEP_ALIVE": True} old_conn = None class ReusableSanicConnectionPool(httpcore.ConnectionPool): - async def acquire_connection(self, url, ssl, timeout): + async def acquire_connection(self, origin): global old_conn - if timeout.connect_timeout and not timeout.pool_timeout: - timeout.pool_timeout = timeout.connect_timeout - key = (url.scheme, url.hostname, url.port, ssl, timeout) - try: - connection = self._keepalive_connections[key].pop() - if not self._keepalive_connections[key]: - del self._keepalive_connections[key] - self.num_keepalive_connections -= 1 - self.num_active_connections += 1 + connection = self.active_connections.pop_by_origin(origin, http2_only=True) + if connection is None: + connection = self.keepalive_connections.pop_by_origin(origin) - except (KeyError, IndexError): - ssl_context = await self.get_ssl_context(url, ssl) - try: - await asyncio.wait_for( - self._max_connections.acquire(), timeout.pool_timeout - ) - except asyncio.TimeoutError: - raise PoolTimeout() - release = functools.partial(self.release_connection, key=key) - connection = httpcore.connections.Connection( - timeout=timeout, on_release=release + if connection is None: + await self.max_connections.acquire() + connection = httpcore.HTTPConnection( + origin, + ssl=self.ssl, + timeout=self.timeout, + backend=self.backend, + release_func=self.release_connection, ) - self.num_active_connections += 1 - await connection.open(url.hostname, url.port, ssl=ssl_context) + self.active_connections.add(connection) if old_conn is not None: if old_conn != connection: @@ -70,62 +53,6 @@ class ReusableSanicAdapter(requests.adapters.HTTPAdapter): def __init__(self): self.pool = ReusableSanicConnectionPool() - async def send( - self, - request, - stream=False, - timeout=None, - verify=True, - cert=None, - proxies=None, - ): - - method = request.method - url = request.url - headers = [ - (_encode(k), _encode(v)) for k, v in request.headers.items() - ] - - if not request.body: - body = b"" - elif isinstance(request.body, str): - body = _encode(request.body) - else: - body = request.body - - if isinstance(timeout, tuple): - timeout_kwargs = { - "connect_timeout": timeout[0], - "read_timeout": timeout[1], - } - else: - timeout_kwargs = { - "connect_timeout": timeout, - "read_timeout": timeout, - } - - ssl = httpcore.SSLConfig(cert=cert, verify=verify) - timeout = httpcore.TimeoutConfig(**timeout_kwargs) - - try: - response = await self.pool.request( - method, - url, - headers=headers, - body=body, - stream=stream, - ssl=ssl, - timeout=timeout, - ) - except (httpcore.BadResponse, socket.error) as err: - raise ConnectionError(err, request=request) - except httpcore.ConnectTimeout as err: - raise requests.exceptions.ConnectTimeout(err, request=request) - except httpcore.ReadTimeout as err: - raise requests.exceptions.ReadTimeout(err, request=request) - - return self.build_response(request, response) - class ResusableSanicSession(requests.Session): def __init__(self, *args, **kwargs) -> None: @@ -153,13 +80,14 @@ class ReuseableSanicTestClient(SanicTestClient): uri="/", gather_request=True, debug=False, - server_kwargs={"return_asyncio_server": True}, + server_kwargs=None, *request_args, **request_kwargs, ): loop = self._loop results = [None, None] exceptions = [] + server_kwargs = server_kwargs or {"return_asyncio_server": True} if gather_request: def _collect_request(request): @@ -187,7 +115,6 @@ class ReuseableSanicTestClient(SanicTestClient): ) results[-1] = response except Exception as e2: - # traceback.print_tb(e2.__traceback__) exceptions.append(e2) if self._server is not None: @@ -205,7 +132,6 @@ class ReuseableSanicTestClient(SanicTestClient): loop._stopping = False _server = loop.run_until_complete(_server_co) except Exception as e1: - # traceback.print_tb(e1.__traceback__) raise e1 self._server = _server server.trigger_events(self.app.listeners["after_server_start"], loop) @@ -257,36 +183,32 @@ class ReuseableSanicTestClient(SanicTestClient): request_keepalive = kwargs.pop( "request_keepalive", CONFIG_FOR_TESTS["KEEP_ALIVE_TIMEOUT"] ) - if self._session: - _session = self._session - else: - _session = ResusableSanicSession() - self._session = _session - async with _session as session: - try: - response = await getattr(session, method.lower())( - url, - verify=False, - timeout=request_keepalive, - *args, - **kwargs, - ) - except NameError: - raise Exception(response.status_code) + if not self._session: + self._session = ResusableSanicSession() + try: + response = await getattr(self._session, method.lower())( + url, + verify=False, + timeout=request_keepalive, + *args, + **kwargs, + ) + except NameError: + raise Exception(response.status_code) - try: - response.json = response.json() - except (JSONDecodeError, UnicodeDecodeError): - response.json = None + try: + response.json = response.json() + except (JSONDecodeError, UnicodeDecodeError): + response.json = None - response.body = await response.read() - response.status = response.status_code - response.content_type = response.headers.get("content-type") + response.body = await response.read() + response.status = response.status_code + response.content_type = response.headers.get("content-type") - if raw_cookies: - response.raw_cookies = {} - for cookie in response.cookies: - response.raw_cookies[cookie.name] = cookie + if raw_cookies: + response.raw_cookies = {} + for cookie in response.cookies: + response.raw_cookies[cookie.name] = cookie return response @@ -319,42 +241,46 @@ def test_keep_alive_timeout_reuse(): """If the server keep-alive timeout and client keep-alive timeout are both longer than the delay, the client _and_ server will successfully reuse the existing connection.""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop) - headers = {"Connection": "keep-alive"} - request, response = client.get("/1", headers=headers) - assert response.status == 200 - assert response.text == "OK" - loop.run_until_complete(aio_sleep(1)) - request, response = client.get("/1") - assert response.status == 200 - assert response.text == "OK" - client.kill_server() + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + client = ReuseableSanicTestClient(keep_alive_timeout_app_reuse, loop) + headers = {"Connection": "keep-alive"} + request, response = client.get("/1", headers=headers) + assert response.status == 200 + assert response.text == "OK" + loop.run_until_complete(aio_sleep(1)) + request, response = client.get("/1") + assert response.status == 200 + assert response.text == "OK" + finally: + client.kill_server() def test_keep_alive_client_timeout(): """If the server keep-alive timeout is longer than the client keep-alive timeout, client will try to create a new connection here.""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = ReuseableSanicTestClient(keep_alive_app_client_timeout, loop) - headers = {"Connection": "keep-alive"} try: - request, response = client.get( - "/1", headers=headers, request_keepalive=1 - ) - assert response.status == 200 - assert response.text == "OK" - loop.run_until_complete(aio_sleep(2)) - exception = None - request, response = client.get("/1", request_keepalive=1) - except ValueError as e: - exception = e - assert exception is not None - assert isinstance(exception, ValueError) - assert "got a new connection" in exception.args[0] - client.kill_server() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + client = ReuseableSanicTestClient(keep_alive_app_client_timeout, loop) + headers = {"Connection": "keep-alive"} + try: + request, response = client.get( + "/1", headers=headers, request_keepalive=1 + ) + assert response.status == 200 + assert response.text == "OK" + loop.run_until_complete(aio_sleep(2)) + exception = None + request, response = client.get("/1", request_keepalive=1) + except ValueError as e: + exception = e + assert exception is not None + assert isinstance(exception, ValueError) + assert "got a new connection" in exception.args[0] + finally: + client.kill_server() def test_keep_alive_server_timeout(): @@ -362,25 +288,27 @@ def test_keep_alive_server_timeout(): keep-alive timeout, the client will either a 'Connection reset' error _or_ a new connection. Depending on how the event-loop handles the broken server connection.""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = ReuseableSanicTestClient(keep_alive_app_server_timeout, loop) - headers = {"Connection": "keep-alive"} try: - request, response = client.get( - "/1", headers=headers, request_keepalive=60 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + client = ReuseableSanicTestClient(keep_alive_app_server_timeout, loop) + headers = {"Connection": "keep-alive"} + try: + request, response = client.get( + "/1", headers=headers, request_keepalive=60 + ) + assert response.status == 200 + assert response.text == "OK" + loop.run_until_complete(aio_sleep(3)) + exception = None + request, response = client.get("/1", request_keepalive=60) + except ValueError as e: + exception = e + assert exception is not None + assert isinstance(exception, ValueError) + assert ( + "Connection reset" in exception.args[0] + or "got a new connection" in exception.args[0] ) - assert response.status == 200 - assert response.text == "OK" - loop.run_until_complete(aio_sleep(3)) - exception = None - request, response = client.get("/1", request_keepalive=60) - except ValueError as e: - exception = e - assert exception is not None - assert isinstance(exception, ValueError) - assert ( - "Connection reset" in exception.args[0] - or "got a new connection" in exception.args[0] - ) - client.kill_server() + finally: + client.kill_server() diff --git a/tests/test_request_stream.py b/tests/test_request_stream.py index c1457c8f..d845dc85 100644 --- a/tests/test_request_stream.py +++ b/tests/test_request_stream.py @@ -71,15 +71,13 @@ def test_request_stream_app(app): @app.post("/post/", stream=True) async def post(request, id): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) @app.put("/_put") async def _put(request): @@ -89,15 +87,13 @@ def test_request_stream_app(app): @app.put("/put", stream=True) async def put(request): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) @app.patch("/_patch") async def _patch(request): @@ -107,15 +103,14 @@ def test_request_stream_app(app): @app.patch("/patch", stream=True) async def patch(request): assert isinstance(request.stream, StreamBuffer) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) assert app.is_request_stream is True @@ -166,15 +161,13 @@ def test_request_stream_handle_exception(app): @app.post("/post/", stream=True) async def post(request, id): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) # 404 request, response = app.test_client.post("/in_valid_post", data=data) @@ -222,15 +215,13 @@ def test_request_stream_blueprint(app): @bp.post("/post/", stream=True) async def post(request, id): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) @bp.put("/_put") async def _put(request): @@ -240,15 +231,13 @@ def test_request_stream_blueprint(app): @bp.put("/put", stream=True) async def put(request): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) @bp.patch("/_patch") async def _patch(request): @@ -258,27 +247,23 @@ def test_request_stream_blueprint(app): @bp.patch("/patch", stream=True) async def patch(request): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) async def post_add_route(request): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) bp.add_route( post_add_route, "/post/add_route", methods=["POST"], stream=True @@ -388,15 +373,13 @@ def test_request_stream(app): @app.post("/stream", stream=True) async def handler(request): assert isinstance(request.stream, StreamBuffer) - - async def streaming(response): - while True: - body = await request.stream.read() - if body is None: - break - await response.write(body.decode("utf-8")) - - return stream(streaming) + result = "" + while True: + body = await request.stream.read() + if body is None: + break + result += body.decode("utf-8") + return text(result) @app.get("/get") async def get(request): diff --git a/tests/test_request_timeout.py b/tests/test_request_timeout.py index e59f2d2f..3a41e462 100644 --- a/tests/test_request_timeout.py +++ b/tests/test_request_timeout.py @@ -13,37 +13,26 @@ class DelayableSanicConnectionPool(httpcore.ConnectionPool): self._request_delay = request_delay super().__init__(*args, **kwargs) - async def request( + async def send( self, - method, - url, - headers=(), - body=b"", + request, stream=False, ssl=None, timeout=None, ): - if ssl is None: - ssl = self.ssl_config - if timeout is None: - timeout = self.timeout - - parsed_url = httpcore.URL(url) - request = httpcore.Request( - method, parsed_url, headers=headers, body=body - ) - connection = await self.acquire_connection( - parsed_url, ssl=ssl, timeout=timeout - ) + connection = await self.acquire_connection(request.url.origin) + if connection.h11_connection is None and connection.h2_connection is None: + await connection.connect(ssl=ssl, timeout=timeout) if self._request_delay: - print(f"\t>> Sleeping ({self._request_delay})") await asyncio.sleep(self._request_delay) - response = await connection.send(request) - if not stream: - try: - await response.read() - finally: - await response.close() + try: + response = await connection.send( + request, stream=stream, ssl=ssl, timeout=timeout + ) + except BaseException as exc: + self.active_connections.remove(connection) + self.max_connections.release() + raise exc return response diff --git a/tox.ini b/tox.ini index c825f0de..616b7acd 100644 --- a/tox.ini +++ b/tox.ini @@ -12,8 +12,8 @@ deps = pytest-cov pytest-sanic pytest-sugar - httpcore==0.1.1 - requests-async==0.4.0 + httpcore==0.3.0 + requests-async==0.5.0 chardet<=2.3.0 beautifulsoup4 gunicorn