diff --git a/cista-front/vite.config.ts b/cista-front/vite.config.ts index 39ef537..9e1b9d8 100644 --- a/cista-front/vite.config.ts +++ b/cista-front/vite.config.ts @@ -44,6 +44,7 @@ export default defineConfig({ "/files": dev_backend, "/login": dev_backend, "/logout": dev_backend, + "/zip": dev_backend, } }, build: { diff --git a/cista/app.py b/cista/app.py index 33af9b0..b621276 100644 --- a/cista/app.py +++ b/cista/app.py @@ -1,5 +1,6 @@ import asyncio import mimetypes +from concurrent.futures import ThreadPoolExecutor from importlib.resources import files from urllib.parse import unquote from wsgiref.handlers import format_date_time @@ -9,9 +10,11 @@ import sanic.helpers from blake3 import blake3 from sanic import Blueprint, Sanic, empty, raw from sanic.exceptions import Forbidden, NotFound +from sanic.log import logging from cista import auth, config, session, watching from cista.api import bp +from cista.protocol import DirEntry from cista.util.apphelpers import handle_sanic_exception # Workaround until Sanic PR #2824 is merged @@ -27,11 +30,13 @@ app.exception(Exception)(handle_sanic_exception) async def main_start(app, loop): config.load_config() await watching.start(app, loop) + app.ctx.threadexec = ThreadPoolExecutor(max_workers=8) @app.after_server_stop async def main_stop(app, loop): await watching.stop(app, loop) + app.ctx.threadexec.shutdown() @app.on_request @@ -161,3 +166,83 @@ async def wwwroot(req, path=""): } data = br return raw(data, headers=headers) + + +import datetime +from collections import deque +from pathlib import Path +from stat import S_IFREG + +from stream_zip import ZIP_AUTO, stream_zip + + +@app.get("/zip//") +async def zip_download(req, keys, zipfile, ext): + """Download a zip archive of the given keys""" + wanted = set(keys.split("+")) + with watching.tree_lock: + q = deque([([], None, watching.tree[""].dir)]) + files = [] + while q: + locpar, relpar, d = q.pop() + for name, attr in d.items(): + loc = [*locpar, name] + rel = None + if relpar or attr.key in wanted: + rel = [*relpar, name] if relpar else [name] + wanted.remove(attr.key) + if isinstance(attr, DirEntry): + q.append((loc, rel, attr.dir)) + elif rel: + files.append( + ( + "/".join(rel), + Path(watching.rootpath.joinpath(*loc)), + attr.mtime, + attr.size, + ) + ) + + if not files: + raise NotFound( + "No files found", + context={"keys": keys, "zipfile": zipfile, "wanted": wanted}, + ) + if wanted: + raise NotFound("Files not found", context={"missing": wanted}) + + for rel, p, mtime, size in files: + if not p.is_file(): + raise NotFound(f"File not found {rel}") + + def local_files(files): + for rel, p, mtime, size in files: + modified = datetime.datetime.fromtimestamp(mtime, datetime.UTC) + yield rel, modified, S_IFREG | 0o644, ZIP_AUTO(size), contents(p) + + def contents(name): + with name.open("rb") as f: + while chunk := f.read(65536): + yield chunk + + def worker(): + try: + for chunk in stream_zip(local_files(files)): + asyncio.run_coroutine_threadsafe(queue.put(chunk), loop) + except Exception: + logging.exception("Error streaming ZIP") + raise + finally: + asyncio.run_coroutine_threadsafe(queue.put(None), loop) + + # Don't block the event loop: run in a thread + queue = asyncio.Queue(maxsize=1) + loop = asyncio.get_event_loop() + thread = loop.run_in_executor(app.ctx.threadexec, worker) + + # Stream the response + res = await req.respond(content_type="application/zip") + while chunk := await queue.get(): + await res.send(chunk) + + await thread # If it raises, the response will fail download diff --git a/pyproject.toml b/pyproject.toml index 25b0b1d..f51523d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,6 @@ name = "cista" dynamic = ["version"] description = "Dropbox-like file server with modern web interface" readme = "README.md" -license = "Public Domain" authors = [ { name = "Vasanko" }, ] @@ -25,6 +24,7 @@ dependencies = [ "pyjwt", "sanic", "tomli_w", + "stream-zip", ] [project.urls]