import asyncio import typing import msgspec from sanic import Blueprint, SanicException from cista import watching from cista.fileio import FileServer from cista.protocol import ControlBase, ErrorMsg, FileRange, StatusMsg from cista.util.apphelpers import asend, websocket_wrapper bp = Blueprint("api", url_prefix="/api") fileserver = FileServer() @bp.before_server_start async def start_fileserver(app, _): await fileserver.start() @bp.after_server_stop async def stop_fileserver(app, _): await fileserver.stop() @bp.websocket('upload') @websocket_wrapper async def upload(req, ws): alink = fileserver.alink while True: req = None text = await ws.recv() if not isinstance(text, str): raise ValueError(f"Expected JSON control, got binary len(data) = {len(text)}") req = msgspec.json.decode(text, type=FileRange) pos = req.start data = None while pos < req.end and (data := await ws.recv()) and isinstance(data, bytes): sentsize = await alink(("upload", req.name, pos, data, req.size)) pos += typing.cast(int, sentsize) if pos != req.end: d = f"{len(data)} bytes" if isinstance(data, bytes) else data raise ValueError(f"Expected {req.end - pos} more bytes, got {d}") # Report success res = StatusMsg(status="ack", req=req) await asend(ws, res) #await ws.drain() @bp.websocket('download') @websocket_wrapper async def download(req, ws): alink = fileserver.alink while True: req = None text = await ws.recv() if not isinstance(text, str): raise ValueError(f"Expected JSON control, got binary len(data) = {len(text)}") req = msgspec.json.decode(text, type=FileRange) pos = req.start while pos < req.end: end = min(req.end, pos + (1<<20)) data = typing.cast(bytes, await alink(("download", req.name, pos, end))) await asend(ws, data) pos += len(data) # Report success res = StatusMsg(status="ack", req=req) await asend(ws, res) #await ws.drain() @bp.websocket("control") @websocket_wrapper async def control(req, ws): cmd = msgspec.json.decode(await ws.recv(), type=ControlBase) await asyncio.to_thread(cmd) await asend(ws, StatusMsg(status="ack", req=cmd)) @bp.websocket("watch") @websocket_wrapper async def watch(req, ws): try: with watching.tree_lock: q = watching.pubsub[ws] = asyncio.Queue() # Init with disk usage and full tree await ws.send(watching.format_du()) await ws.send(watching.format_tree()) # Send updates while True: await ws.send(await q.get()) finally: del watching.pubsub[ws]