sanic/examples/run_asgi.py

89 lines
1.9 KiB
Python
Raw Permalink Normal View History

2019-05-21 23:42:19 +01:00
"""
1. Create a simple Sanic app
2019-05-27 00:11:52 +01:00
0. Run with an ASGI server:
2019-05-21 23:42:19 +01:00
$ uvicorn run_asgi:app
or
$ hypercorn run_asgi:app
"""
2019-05-27 00:11:52 +01:00
from pathlib import Path
2019-05-26 22:57:50 +01:00
from sanic import Sanic, response
2019-05-21 23:42:19 +01:00
app = Sanic(__name__)
2019-05-26 22:57:50 +01:00
@app.route("/text")
2019-05-27 00:11:52 +01:00
def handler_text(request):
2019-05-26 22:57:50 +01:00
return response.text("Hello")
2019-05-21 23:42:19 +01:00
2019-05-26 22:57:50 +01:00
@app.route("/json")
2019-05-27 00:11:52 +01:00
def handler_json(request):
return response.json({"foo": "bar"})
2019-05-21 23:42:19 +01:00
2019-05-26 22:57:50 +01:00
@app.websocket("/ws")
2019-05-27 00:11:52 +01:00
async def handler_ws(request, ws):
2019-05-21 23:42:19 +01:00
name = "<someone>"
while True:
data = f"Hello {name}"
await ws.send(data)
name = await ws.recv()
if not name:
break
2019-05-26 22:57:50 +01:00
@app.route("/file")
2019-05-27 00:11:52 +01:00
async def handler_file(request):
return await response.file(Path("../") / "setup.py")
2019-05-26 22:57:50 +01:00
@app.route("/file_stream")
2019-05-27 00:11:52 +01:00
async def handler_file_stream(request):
2019-05-26 22:57:50 +01:00
return await response.file_stream(
2019-05-27 00:11:52 +01:00
Path("../") / "setup.py", chunk_size=1024
2019-05-26 22:57:50 +01:00
)
2019-05-27 00:11:52 +01:00
@app.route("/stream", stream=True)
async def handler_stream(request):
while True:
body = await request.stream.read()
if body is None:
break
body = body.decode("utf-8").replace("1", "A")
# await response.write(body)
return response.stream(body)
@app.listener("before_server_start")
async def listener_before_server_start(*args, **kwargs):
print("before_server_start")
@app.listener("after_server_start")
async def listener_after_server_start(*args, **kwargs):
print("after_server_start")
@app.listener("before_server_stop")
async def listener_before_server_stop(*args, **kwargs):
print("before_server_stop")
@app.listener("after_server_stop")
async def listener_after_server_stop(*args, **kwargs):
print("after_server_stop")
@app.middleware("request")
async def print_on_request(request):
print("print_on_request")
@app.middleware("response")
async def print_on_response(request, response):
print("print_on_response")