sanic/tests/test_create_task.py

51 lines
1.1 KiB
Python
Raw Normal View History

2017-02-11 14:52:40 -08:00
import asyncio
from threading import Event
from sanic.response import text
2017-02-11 14:52:40 -08:00
2017-05-04 15:52:18 +09:00
2018-08-26 16:43:14 +02:00
def test_create_task(app):
2017-02-11 14:52:40 -08:00
e = Event()
2017-05-04 15:52:18 +09:00
2017-02-11 14:52:40 -08:00
async def coro():
await asyncio.sleep(0.05)
e.set()
2018-12-30 13:18:06 +02:00
@app.route("/early")
2017-02-11 14:52:40 -08:00
def not_set(request):
return text(str(e.is_set()))
2017-02-11 14:52:40 -08:00
2018-12-30 13:18:06 +02:00
@app.route("/late")
2017-02-11 14:52:40 -08:00
async def set(request):
await asyncio.sleep(0.1)
return text(str(e.is_set()))
2017-02-11 14:52:40 -08:00
app.add_task(coro)
2018-12-30 13:18:06 +02:00
request, response = app.test_client.get("/early")
assert response.body == b"False"
2017-02-11 14:52:40 -08:00
app.signal_router.reset()
app.add_task(coro)
2018-12-30 13:18:06 +02:00
request, response = app.test_client.get("/late")
assert response.body == b"True"
2018-10-22 22:25:38 +02:00
2018-08-26 16:43:14 +02:00
def test_create_task_with_app_arg(app):
@app.after_server_start
async def setup_q(app, _):
app.ctx.q = asyncio.Queue()
2018-12-30 13:18:06 +02:00
@app.route("/")
async def not_set(request):
return text(await request.app.ctx.q.get())
async def coro(app):
await app.ctx.q.put(app.name)
app.add_task(coro)
_, response = app.test_client.get("/")
assert response.text == "test_create_task_with_app_arg"