import mimetypes from importlib.resources import files from urllib.parse import unquote import asyncio import brotli from sanic import Blueprint, Sanic, raw from sanic.exceptions import Forbidden, NotFound from cista import auth, config, session, watching from cista.api import bp from cista.util.apphelpers import handle_sanic_exception app = Sanic("cista", strict_slashes=True) app.blueprint(auth.bp) app.blueprint(bp) app.exception(Exception)(handle_sanic_exception) @app.before_server_start async def main_start(app, loop): config.load_config() await watching.start(app, loop) @app.after_server_stop async def main_stop(app, loop): await watching.stop(app, loop) @app.on_request async def use_session(req): req.ctx.session = session.get(req) try: req.ctx.user = config.config.users[req.ctx.session["username"]] # type: ignore except (AttributeError, KeyError, TypeError): req.ctx.user = None # CSRF protection if req.method == "GET" and req.headers.upgrade != "websocket": return # Ordinary GET requests are fine # Check that origin matches host, for browsers which should all send Origin. # Curl doesn't send any Origin header, so we allow it anyway. origin = req.headers.origin if origin and origin.split("//", 1)[1] != req.host: raise Forbidden("Invalid origin: Cross-Site requests not permitted") @app.before_server_start def http_fileserver(app, _): bp = Blueprint("fileserver") bp.on_request(auth.verify) bp.static( "/files/", config.config.path, use_content_range=True, stream_large_files=True, directory_view=True, ) app.blueprint(bp) www = {} @app.before_server_start def load_wwwroot(app): global www wwwnew = {} base = files("cista") / "wwwroot" paths = ["."] while paths: path = paths.pop(0) current = base / path for p in current.iterdir(): if p.is_dir(): paths.append(current / p.parts[-1]) continue name = p.relative_to(base).as_posix() mime = mimetypes.guess_type(name)[0] or "application/octet-stream" data = p.read_bytes() # Use old data if not changed if name in www and www[name][0] == data: wwwnew[name] = www[name] continue # Precompress with Brotli br = brotli.compress(data) if len(br) >= len(data): br = False wwwnew[name] = data, br, mime www = wwwnew @app.add_task async def refresh_wwwroot(): while app.debug: await asyncio.sleep(0.5) load_wwwroot(app) @app.get("/", static=True) async def wwwroot(req, path=""): """Frontend files only""" name = unquote(path) or "index.html" if name not in www: raise NotFound(f"File not found: /{path}", extra={"name": name}) data, br, mime = www[name] headers = {} # Brotli compressed? if br and "br" in req.headers.accept_encoding.split(", "): headers["content-encoding"] = "br" data = br return raw(data, content_type=mime, headers=headers)