sanic/tests/test_create_task.py

46 lines
931 B
Python
Raw Normal View History

2017-02-11 22:52:40 +00:00
from sanic.response import text
from threading import Event
import asyncio
from queue import Queue
2017-02-11 22:52:40 +00:00
2017-05-04 07:52:18 +01:00
2018-08-26 15:43:14 +01:00
def test_create_task(app):
2017-02-11 22:52:40 +00:00
e = Event()
2017-05-04 07:52:18 +01:00
2017-02-11 22:52:40 +00:00
async def coro():
await asyncio.sleep(0.05)
e.set()
2017-02-12 20:07:59 +00:00
app.add_task(coro)
2017-02-11 22:52:40 +00:00
@app.route('/early')
def not_set(request):
return text(e.is_set())
@app.route('/late')
async def set(request):
await asyncio.sleep(0.1)
return text(e.is_set())
2017-05-04 07:52:18 +01:00
request, response = app.test_client.get('/early')
2017-02-11 22:52:40 +00:00
assert response.body == b'False'
2017-05-04 07:52:18 +01:00
request, response = app.test_client.get('/late')
2017-02-11 22:52:40 +00:00
assert response.body == b'True'
2018-10-22 21:25:38 +01:00
2018-08-26 15:43:14 +01:00
def test_create_task_with_app_arg(app):
q = Queue()
@app.route('/')
def not_set(request):
return "hello"
async def coro(app):
q.put(app.name)
app.add_task(coro)
request, response = app.test_client.get('/')
2018-08-26 15:43:14 +01:00
assert q.get() == 'test_create_task_with_app_arg'