sanic/tests/test_request_timeout.py
2018-03-15 21:28:52 -07:00

176 lines
6.3 KiB
Python

from json import JSONDecodeError
from sanic import Sanic
import asyncio
from sanic.response import text
from sanic.config import Config
import aiohttp
from aiohttp import TCPConnector
from sanic.testing import SanicTestClient, HOST, PORT
class DelayableTCPConnector(TCPConnector):
class RequestContextManager(object):
def __new__(cls, req, delay):
cls = super(DelayableTCPConnector.RequestContextManager, cls).\
__new__(cls)
cls.req = req
cls.send_task = None
cls.resp = None
cls.orig_send = getattr(req, 'send')
cls.orig_start = None
cls.delay = delay
cls._acting_as = req
return cls
def __getattr__(self, item):
acting_as = self._acting_as
return getattr(acting_as, item)
async def start(self, connection, read_until_eof=False):
if self.send_task is None:
raise RuntimeError("do a send() before you do a start()")
resp = await self.send_task
self.send_task = None
self.resp = resp
self._acting_as = self.resp
self.orig_start = getattr(resp, 'start')
try:
ret = await self.orig_start(connection,
read_until_eof)
except Exception as e:
raise e
return ret
def close(self):
if self.resp is not None:
self.resp.close()
if self.send_task is not None:
self.send_task.cancel()
async def delayed_send(self, *args, **kwargs):
req = self.req
if self.delay and self.delay > 0:
#sync_sleep(self.delay)
await asyncio.sleep(self.delay)
t = req.loop.time()
print("sending at {}".format(t), flush=True)
conn = next(iter(args)) # first arg is connection
try:
delayed_resp = self.orig_send(*args, **kwargs)
except Exception as e:
return aiohttp.ClientResponse(req.method, req.url)
return delayed_resp
def send(self, *args, **kwargs):
gen = self.delayed_send(*args, **kwargs)
task = self.req.loop.create_task(gen)
self.send_task = task
self._acting_as = task
return self
def __init__(self, *args, **kwargs):
_post_connect_delay = kwargs.pop('post_connect_delay', 0)
_pre_request_delay = kwargs.pop('pre_request_delay', 0)
super(DelayableTCPConnector, self).__init__(*args, **kwargs)
self._post_connect_delay = _post_connect_delay
self._pre_request_delay = _pre_request_delay
if aiohttp.__version__ >= '3.0':
async def connect(self, req, traces=None):
d_req = DelayableTCPConnector.\
RequestContextManager(req, self._pre_request_delay)
conn = await super(DelayableTCPConnector, self).connect(req, traces=traces)
if self._post_connect_delay and self._post_connect_delay > 0:
await asyncio.sleep(self._post_connect_delay,
loop=self._loop)
req.send = d_req.send
t = req.loop.time()
print("Connected at {}".format(t), flush=True)
return conn
else:
async def connect(self, req):
d_req = DelayableTCPConnector.\
RequestContextManager(req, self._pre_request_delay)
conn = await super(DelayableTCPConnector, self).connect(req)
if self._post_connect_delay and self._post_connect_delay > 0:
await asyncio.sleep(self._post_connect_delay,
loop=self._loop)
req.send = d_req.send
t = req.loop.time()
print("Connected at {}".format(t), flush=True)
return conn
class DelayableSanicTestClient(SanicTestClient):
def __init__(self, app, loop, request_delay=1):
super(DelayableSanicTestClient, self).__init__(app)
self._request_delay = request_delay
self._loop = None
async def _local_request(self, method, uri, cookies=None, *args,
**kwargs):
if self._loop is None:
self._loop = asyncio.get_event_loop()
if uri.startswith(('http:', 'https:', 'ftp:', 'ftps://' '//')):
url = uri
else:
url = 'http://{host}:{port}{uri}'.format(
host=HOST, port=self.port, uri=uri)
conn = DelayableTCPConnector(pre_request_delay=self._request_delay,
verify_ssl=False, loop=self._loop)
async with aiohttp.ClientSession(cookies=cookies, connector=conn,
loop=self._loop) as session:
# Insert a delay after creating the connection
# But before sending the request.
async with getattr(session, method.lower())(
url, *args, **kwargs) as response:
try:
response.text = await response.text()
except UnicodeDecodeError:
response.text = None
try:
response.json = await response.json()
except (JSONDecodeError,
UnicodeDecodeError,
aiohttp.ClientResponseError):
response.json = None
response.body = await response.read()
return response
Config.REQUEST_TIMEOUT = 2
request_timeout_default_app = Sanic('test_request_timeout_default')
request_no_timeout_app = Sanic('test_request_no_timeout')
@request_timeout_default_app.route('/1')
async def handler1(request):
return text('OK')
@request_no_timeout_app.route('/1')
async def handler2(request):
return text('OK')
def test_default_server_error_request_timeout():
client = DelayableSanicTestClient(request_timeout_default_app, None, 3)
request, response = client.get('/1')
assert response.status == 408
assert response.text == 'Error: Request Timeout'
def test_default_server_error_request_dont_timeout():
client = DelayableSanicTestClient(request_no_timeout_app, None, 1)
request, response = client.get('/1')
assert response.status == 200
assert response.text == 'OK'