from json import JSONDecodeError from sanic import Sanic import asyncio from sanic.response import text from sanic.config import Config import aiohttp from aiohttp import TCPConnector from sanic.testing import SanicTestClient, HOST try: try: # direct use import packaging version = packaging.version except (ImportError, AttributeError): # setuptools v39.0 and above. try: from setuptools.extern import packaging except ImportError: # Before setuptools v39.0 from pkg_resources.extern import packaging version = packaging.version except ImportError: raise RuntimeError("The 'packaging' library is missing.") aiohttp_version = version.parse(aiohttp.__version__) class DelayableTCPConnector(TCPConnector): class RequestContextManager(object): def __new__(cls, req, delay): cls = super( DelayableTCPConnector.RequestContextManager, cls ).__new__(cls) cls.req = req cls.send_task = None cls.resp = None cls.orig_send = getattr(req, "send") cls.orig_start = None cls.delay = delay cls._acting_as = req return cls def __getattr__(self, item): acting_as = self._acting_as return getattr(acting_as, item) async def start(self, connection, read_until_eof=False): if self.send_task is None: raise RuntimeError("do a send() before you do a start()") resp = await self.send_task self.send_task = None self.resp = resp self._acting_as = self.resp self.orig_start = getattr(resp, "start") try: if aiohttp_version >= version.parse("3.3.0"): ret = await self.orig_start(connection) else: ret = await self.orig_start(connection, read_until_eof) except Exception as e: raise e return ret def close(self): if self.resp is not None: self.resp.close() if self.send_task is not None: self.send_task.cancel() async def delayed_send(self, *args, **kwargs): req = self.req if self.delay and self.delay > 0: # sync_sleep(self.delay) await asyncio.sleep(self.delay) t = req.loop.time() print("sending at {}".format(t), flush=True) next(iter(args)) # first arg is connection try: return await self.orig_send(*args, **kwargs) except Exception as e: if aiohttp_version < version.parse("3.1.0"): return aiohttp.ClientResponse(req.method, req.url) kw = dict( writer=None, continue100=None, timer=None, request_info=None, traces=[], loop=req.loop, session=None, ) if aiohttp_version < version.parse("3.3.0"): kw["auto_decompress"] = None return aiohttp.ClientResponse(req.method, req.url, **kw) def _send(self, *args, **kwargs): gen = self.delayed_send(*args, **kwargs) task = self.req.loop.create_task(gen) self.send_task = task self._acting_as = task return self if aiohttp_version >= version.parse("3.1.0"): # aiohttp changed the request.send method to async async def send(self, *args, **kwargs): return self._send(*args, **kwargs) else: send = _send def __init__(self, *args, **kwargs): _post_connect_delay = kwargs.pop("post_connect_delay", 0) _pre_request_delay = kwargs.pop("pre_request_delay", 0) super(DelayableTCPConnector, self).__init__(*args, **kwargs) self._post_connect_delay = _post_connect_delay self._pre_request_delay = _pre_request_delay async def connect(self, req, *args, **kwargs): d_req = DelayableTCPConnector.RequestContextManager( req, self._pre_request_delay ) conn = await super(DelayableTCPConnector, self).connect( req, *args, **kwargs ) if self._post_connect_delay and self._post_connect_delay > 0: await asyncio.sleep(self._post_connect_delay, loop=self._loop) req.send = d_req.send t = req.loop.time() print("Connected at {}".format(t), flush=True) return conn class DelayableSanicTestClient(SanicTestClient): def __init__(self, app, loop, request_delay=1): super(DelayableSanicTestClient, self).__init__(app) self._request_delay = request_delay self._loop = None async def _local_request(self, method, uri, cookies=None, *args, **kwargs): if self._loop is None: self._loop = asyncio.get_event_loop() if uri.startswith(("http:", "https:", "ftp:", "ftps://" "//")): url = uri else: url = "http://{host}:{port}{uri}".format( host=HOST, port=self.port, uri=uri ) conn = DelayableTCPConnector( pre_request_delay=self._request_delay, verify_ssl=False, loop=self._loop, ) async with aiohttp.ClientSession( cookies=cookies, connector=conn, loop=self._loop ) as session: # Insert a delay after creating the connection # But before sending the request. async with getattr(session, method.lower())( url, *args, **kwargs ) as response: try: response.text = await response.text() except UnicodeDecodeError: response.text = None try: response.json = await response.json() except ( JSONDecodeError, UnicodeDecodeError, aiohttp.ClientResponseError, ): response.json = None response.body = await response.read() return response Config.REQUEST_TIMEOUT = 0.6 request_timeout_default_app = Sanic("test_request_timeout_default") request_no_timeout_app = Sanic("test_request_no_timeout") @request_timeout_default_app.route("/1") async def handler1(request): return text("OK") @request_no_timeout_app.route("/1") async def handler2(request): return text("OK") @request_timeout_default_app.websocket("/ws1") async def ws_handler1(request, ws): await ws.send("OK") def test_default_server_error_request_timeout(): client = DelayableSanicTestClient(request_timeout_default_app, None, 2) request, response = client.get("/1") assert response.status == 408 assert response.text == "Error: Request Timeout" def test_default_server_error_request_dont_timeout(): client = DelayableSanicTestClient(request_no_timeout_app, None, 0.2) request, response = client.get("/1") assert response.status == 200 assert response.text == "OK" def test_default_server_error_websocket_request_timeout(): headers = { "Upgrade": "websocket", "Connection": "upgrade", "Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==", "Sec-WebSocket-Version": "13", } client = DelayableSanicTestClient(request_timeout_default_app, None, 2) request, response = client.get("/ws1", headers=headers) assert response.status == 408 assert response.text == "Error: Request Timeout"