sanic/tests/test_create_task.py
Adam Hopkins b1b12e004e
Signals Integration (#2160)
* Update some tests

* Resolve #2122 route decorator returning tuple

* Use rc sanic-routing version

* Update unit tests to <:str>

* Minimal working version with some signals implemented

* Add more http signals

* Update ASGI and change listeners to signals

* Allow for dynamic ODE signals

* Allow signals to be stacked

* Begin tests

* Prioritize match_info on keyword argument injection

* WIP on tests

* Compat with signals

* Work through some test coverage

* Passing tests

* Post linting

* Setup proper resets

* coverage reporting

* Fixes from vltr comments

* clear delayed tasks

* Fix bad test

* rm pycache
2021-08-05 22:55:42 +03:00

51 lines
1.1 KiB
Python

import asyncio
from threading import Event
from sanic.response import text
def test_create_task(app):
e = Event()
async def coro():
await asyncio.sleep(0.05)
e.set()
@app.route("/early")
def not_set(request):
return text(str(e.is_set()))
@app.route("/late")
async def set(request):
await asyncio.sleep(0.1)
return text(str(e.is_set()))
app.add_task(coro)
request, response = app.test_client.get("/early")
assert response.body == b"False"
app.signal_router.reset()
app.add_task(coro)
request, response = app.test_client.get("/late")
assert response.body == b"True"
def test_create_task_with_app_arg(app):
@app.after_server_start
async def setup_q(app, _):
app.ctx.q = asyncio.Queue()
@app.route("/")
async def not_set(request):
return text(await request.app.ctx.q.get())
async def coro(app):
await app.ctx.q.put(app.name)
app.add_task(coro)
_, response = app.test_client.get("/")
assert response.text == "test_create_task_with_app_arg"