112 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			112 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| import mimetypes
 | |
| from importlib.resources import files
 | |
| from urllib.parse import unquote
 | |
| 
 | |
| import asyncio
 | |
| import brotli
 | |
| from sanic import Blueprint, Sanic, raw
 | |
| from sanic.exceptions import Forbidden, NotFound
 | |
| 
 | |
| from cista import auth, config, session, watching
 | |
| from cista.api import bp
 | |
| from cista.util import filename
 | |
| from cista.util.apphelpers import handle_sanic_exception
 | |
| 
 | |
| app = Sanic("cista", strict_slashes=True)
 | |
| app.blueprint(auth.bp)
 | |
| app.blueprint(bp)
 | |
| app.exception(Exception)(handle_sanic_exception)
 | |
| 
 | |
| 
 | |
| @app.before_server_start
 | |
| async def main_start(app, loop):
 | |
|     config.load_config()
 | |
|     await watching.start(app, loop)
 | |
| 
 | |
| 
 | |
| @app.after_server_stop
 | |
| async def main_stop(app, loop):
 | |
|     await watching.stop(app, loop)
 | |
| 
 | |
| 
 | |
| @app.on_request
 | |
| async def use_session(req):
 | |
|     req.ctx.session = session.get(req)
 | |
|     try:
 | |
|         req.ctx.user = config.config.users[req.ctx.session["username"]]  # type: ignore
 | |
|     except (AttributeError, KeyError, TypeError):
 | |
|         req.ctx.user = None
 | |
|     # CSRF protection
 | |
|     if req.method == "GET" and req.headers.upgrade != "websocket":
 | |
|         return  # Ordinary GET requests are fine
 | |
|     # Check that origin matches host, for browsers which should all send Origin.
 | |
|     # Curl doesn't send any Origin header, so we allow it anyway.
 | |
|     origin = req.headers.origin
 | |
|     if origin and origin.split("//", 1)[1] != req.host:
 | |
|         raise Forbidden("Invalid origin: Cross-Site requests not permitted")
 | |
| 
 | |
| 
 | |
| @app.before_server_start
 | |
| def http_fileserver(app, _):
 | |
|     bp = Blueprint("fileserver")
 | |
|     bp.on_request(auth.verify)
 | |
|     bp.static(
 | |
|         "/files/",
 | |
|         config.config.path,
 | |
|         use_content_range=True,
 | |
|         stream_large_files=True,
 | |
|         directory_view=True,
 | |
|     )
 | |
|     app.blueprint(bp)
 | |
| 
 | |
| 
 | |
| www = {}
 | |
| 
 | |
| 
 | |
| @app.before_server_start
 | |
| def load_wwwroot(app):
 | |
|     global www
 | |
|     wwwnew = {}
 | |
|     base = files("cista") / "wwwroot"
 | |
|     paths = ["."]
 | |
|     while paths:
 | |
|         path = paths.pop(0)
 | |
|         current = base / path
 | |
|         for p in current.iterdir():
 | |
|             if p.is_dir():
 | |
|                 paths.append(current / p.parts[-1])
 | |
|                 continue
 | |
|             name = p.relative_to(base).as_posix()
 | |
|             mime = mimetypes.guess_type(name)[0] or "application/octet-stream"
 | |
|             data = p.read_bytes()
 | |
|             # Use old data if not changed
 | |
|             if name in www and www[name][0] == data:
 | |
|                 wwwnew[name] = www[name]
 | |
|                 continue
 | |
|             # Precompress with Brotli
 | |
|             br = brotli.compress(data)
 | |
|             if len(br) >= len(data):
 | |
|                 br = False
 | |
|             wwwnew[name] = data, br, mime
 | |
|     www = wwwnew
 | |
| 
 | |
| @app.add_task
 | |
| async def refresh_wwwroot():
 | |
|     while app.debug:
 | |
|         await asyncio.sleep(0.5)
 | |
|         load_wwwroot(app)
 | |
| 
 | |
| @app.get("/<path:path>", static=True)
 | |
| async def wwwroot(req, path=""):
 | |
|     """Frontend files only"""
 | |
|     name = unquote(path) or "index.html"
 | |
|     if name not in www:
 | |
|         raise NotFound(f"File not found: /{path}", extra={"name": name})
 | |
|     data, br, mime = www[name]
 | |
|     headers = {}
 | |
|     # Brotli compressed?
 | |
|     if br and "br" in req.headers.accept_encoding.split(", "):
 | |
|         headers["content-encoding"] = "br"
 | |
|         data = br
 | |
|     return raw(data, content_type=mime, headers=headers)
 | 
