b1b12e004e
* Update some tests * Resolve #2122 route decorator returning tuple * Use rc sanic-routing version * Update unit tests to <:str> * Minimal working version with some signals implemented * Add more http signals * Update ASGI and change listeners to signals * Allow for dynamic ODE signals * Allow signals to be stacked * Begin tests * Prioritize match_info on keyword argument injection * WIP on tests * Compat with signals * Work through some test coverage * Passing tests * Post linting * Setup proper resets * coverage reporting * Fixes from vltr comments * clear delayed tasks * Fix bad test * rm pycache
51 lines
1.1 KiB
Python
51 lines
1.1 KiB
Python
import asyncio
|
|
|
|
from threading import Event
|
|
|
|
from sanic.response import text
|
|
|
|
|
|
def test_create_task(app):
|
|
e = Event()
|
|
|
|
async def coro():
|
|
await asyncio.sleep(0.05)
|
|
e.set()
|
|
|
|
@app.route("/early")
|
|
def not_set(request):
|
|
return text(str(e.is_set()))
|
|
|
|
@app.route("/late")
|
|
async def set(request):
|
|
await asyncio.sleep(0.1)
|
|
return text(str(e.is_set()))
|
|
|
|
app.add_task(coro)
|
|
|
|
request, response = app.test_client.get("/early")
|
|
assert response.body == b"False"
|
|
|
|
app.signal_router.reset()
|
|
app.add_task(coro)
|
|
request, response = app.test_client.get("/late")
|
|
assert response.body == b"True"
|
|
|
|
|
|
def test_create_task_with_app_arg(app):
|
|
@app.after_server_start
|
|
async def setup_q(app, _):
|
|
app.ctx.q = asyncio.Queue()
|
|
|
|
@app.route("/")
|
|
async def not_set(request):
|
|
return text(await request.app.ctx.q.get())
|
|
|
|
async def coro(app):
|
|
await app.ctx.q.put(app.name)
|
|
|
|
app.add_task(coro)
|
|
|
|
_, response = app.test_client.get("/")
|
|
assert response.text == "test_create_task_with_app_arg"
|