cista-storage/cista/api.py

98 lines
2.8 KiB
Python

import asyncio
import typing
import msgspec
from sanic import Blueprint
from cista import watching
from cista.fileio import FileServer
from cista.protocol import ControlBase, FileRange, StatusMsg
from cista.util.apphelpers import asend, websocket_wrapper
bp = Blueprint("api", url_prefix="/api")
fileserver = FileServer()
@bp.before_server_start
async def start_fileserver(app, _):
await fileserver.start()
@bp.after_server_stop
async def stop_fileserver(app, _):
await fileserver.stop()
@bp.websocket("upload")
@websocket_wrapper
async def upload(req, ws):
alink = fileserver.alink
while True:
req = None
text = await ws.recv()
if not isinstance(text, str):
raise ValueError(
f"Expected JSON control, got binary len(data) = {len(text)}"
)
req = msgspec.json.decode(text, type=FileRange)
pos = req.start
data = None
while pos < req.end and (data := await ws.recv()) and isinstance(data, bytes):
sentsize = await alink(("upload", req.name, pos, data, req.size))
pos += typing.cast(int, sentsize)
if pos != req.end:
d = f"{len(data)} bytes" if isinstance(data, bytes) else data
raise ValueError(f"Expected {req.end - pos} more bytes, got {d}")
# Report success
res = StatusMsg(status="ack", req=req)
await asend(ws, res)
# await ws.drain()
@bp.websocket("download")
@websocket_wrapper
async def download(req, ws):
alink = fileserver.alink
while True:
req = None
text = await ws.recv()
if not isinstance(text, str):
raise ValueError(
f"Expected JSON control, got binary len(data) = {len(text)}"
)
req = msgspec.json.decode(text, type=FileRange)
pos = req.start
while pos < req.end:
end = min(req.end, pos + (1 << 20))
data = typing.cast(bytes, await alink(("download", req.name, pos, end)))
await asend(ws, data)
pos += len(data)
# Report success
res = StatusMsg(status="ack", req=req)
await asend(ws, res)
# await ws.drain()
@bp.websocket("control")
@websocket_wrapper
async def control(req, ws):
cmd = msgspec.json.decode(await ws.recv(), type=ControlBase)
await asyncio.to_thread(cmd)
await asend(ws, StatusMsg(status="ack", req=cmd))
@bp.websocket("watch")
@websocket_wrapper
async def watch(req, ws):
try:
with watching.tree_lock:
q = watching.pubsub[ws] = asyncio.Queue()
# Init with disk usage and full tree
await ws.send(watching.format_du())
await ws.send(watching.format_tree())
# Send updates
while True:
await ws.send(await q.get())
finally:
del watching.pubsub[ws]