cista-storage/cista/app.py

102 lines
2.9 KiB
Python
Executable File

import mimetypes
from importlib.resources import files
from urllib.parse import unquote
import brotli
from sanic import Blueprint, Sanic, raw
from sanic.exceptions import Forbidden, NotFound
from cista import auth, config, session, watching
from cista.api import bp
from cista.util import filename
from cista.util.apphelpers import handle_sanic_exception
app = Sanic("cista", strict_slashes=True)
app.blueprint(auth.bp)
app.blueprint(bp)
app.exception(Exception)(handle_sanic_exception)
@app.before_server_start
async def main_start(app, loop):
config.load_config()
await watching.start(app, loop)
@app.after_server_stop
async def main_stop(app, loop):
await watching.stop(app, loop)
@app.on_request
async def use_session(req):
req.ctx.session = session.get(req)
try:
req.ctx.user = config.config.users[req.ctx.session["username"]] # type: ignore
except (AttributeError, KeyError, TypeError):
req.ctx.user = None
# CSRF protection
if req.method == "GET" and req.headers.upgrade != "websocket":
return # Ordinary GET requests are fine
# Check that origin matches host, for browsers which should all send Origin.
# Curl doesn't send any Origin header, so we allow it anyway.
origin = req.headers.origin
if origin and origin.split("//", 1)[1] != req.host:
raise Forbidden("Invalid origin: Cross-Site requests not permitted")
@app.before_server_start
def http_fileserver(app, _):
bp = Blueprint("fileserver")
bp.on_request(auth.verify)
bp.static(
"/files/",
config.config.path,
use_content_range=True,
stream_large_files=True,
directory_view=True,
)
app.blueprint(bp)
www = {}
@app.before_server_start
def load_wwwroot(app):
www.clear()
base = files("cista") / "wwwroot"
paths = ["."]
while paths:
path = paths.pop(0)
current = base / path
print(">>>", current)
for p in current.iterdir():
if p.is_dir():
print(p)
paths.append(current / p.parts[-1])
continue
name = p.relative_to(base).as_posix()
mime = mimetypes.guess_type(name)[0] or "application/octet-stream"
data = p.read_bytes()
br = brotli.compress(data)
if len(br) >= len(data):
br = False
print(name, len(data), len(br) if br else br, mime)
www[name] = data, br, mime
@app.get("/<path:path>", static=True)
async def wwwroot(req, path=""):
"""Frontend files only"""
name = unquote(path) or "index.html"
if name not in www:
raise NotFound(f"File not found: /{path}", extra={"name": name})
data, br, mime = www[name]
headers = {}
# Brotli compressed?
if br and "br" in req.headers.accept_encoding.split(", "):
headers["content-encoding"] = "br"
data = br
return raw(data, content_type=mime, headers=headers)