diff --git a/cista/api.py b/cista/api.py index d08705f..55c760c 100644 --- a/cista/api.py +++ b/cista/api.py @@ -104,11 +104,11 @@ async def watch(req, ws): ) uuid = token_bytes(16) try: - with watching.tree_lock: + with watching.state.lock: q = watching.pubsub[uuid] = asyncio.Queue() # Init with disk usage and full tree - await ws.send(watching.format_du()) - await ws.send(watching.format_tree()) + await ws.send(watching.format_du(watching.state.space)) + await ws.send(watching.format_tree(watching.state.root)) # Send updates while True: await ws.send(await q.get()) diff --git a/cista/app.py b/cista/app.py index 5672a17..2bf7e93 100644 --- a/cista/app.py +++ b/cista/app.py @@ -1,10 +1,8 @@ import asyncio import datetime import mimetypes -from collections import deque from concurrent.futures import ThreadPoolExecutor -from importlib.resources import files -from pathlib import Path, PurePath +from pathlib import Path, PurePath, PurePosixPath from stat import S_IFDIR, S_IFREG from urllib.parse import unquote from wsgiref.handlers import format_date_time @@ -12,15 +10,13 @@ from wsgiref.handlers import format_date_time import brotli import sanic.helpers from blake3 import blake3 -from natsort import natsorted, ns from sanic import Blueprint, Sanic, empty, raw -from sanic.exceptions import Forbidden, NotFound, ServerError, ServiceUnavailable +from sanic.exceptions import Forbidden, NotFound, ServerError from sanic.log import logging from stream_zip import ZIP_AUTO, stream_zip from cista import auth, config, session, watching from cista.api import bp -from cista.protocol import DirEntry, DirList from cista.util.apphelpers import handle_sanic_exception # Workaround until Sanic PR #2824 is merged @@ -186,28 +182,26 @@ async def wwwroot(req, path=""): return raw(data, headers=headers) -def get_files(wanted: set) -> list: - if not isinstance(watching.tree[""], DirEntry): - raise ServiceUnavailable(headers={"retry-after": 1}) - root = Path(watching.rootpath) - with watching.tree_lock: - q: deque[tuple[list[str], None | list[str], DirList]] = deque( - [([], None, watching.tree[""].dir)] - ) - while q: - locpar, relpar, d = q.pop() - for name, attr in d.items(): - loc = [*locpar, name] - rel = None - if relpar or attr.key in wanted: - rel = [*relpar, name] if relpar else [name] - wanted.discard(attr.key) - isdir = isinstance(attr, DirEntry) - if isdir: - q.append((loc, rel, attr.dir)) - if rel: - files.append(("/".join(rel), root.joinpath(*loc))) - return natsorted(files, key=lambda f: "/".join(f[0]), alg=ns.IGNORECASE) +def get_files(wanted: set) -> list[tuple[PurePosixPath, Path]]: + loc = PurePosixPath() + idx = 0 + ret = [] + level: int | None = None + parent: PurePosixPath | None = None + with watching.state.lock: + root = watching.state.root + while idx < len(root): + f = root[idx] + loc = PurePosixPath(*loc.parts[: f.level - 1]) / f.name + if parent is not None and f.level <= level: + level = parent = None + if f.key in wanted: + level, parent = f.level, loc.parent + if parent is not None: + wanted.discard(f.key) + ret.append((loc.relative_to(parent), watching.rootpath / loc)) + idx += 1 + return ret @app.get("/zip//") @@ -220,7 +214,7 @@ async def zip_download(req, keys, zipfile, ext): if not files: raise NotFound( "No files found", - context={"keys": keys, "zipfile": zipfile, "wanted": wanted}, + context={"keys": keys, "zipfile": f"{zipfile}.{ext}", "wanted": wanted}, ) if wanted: raise NotFound("Files not found", context={"missing": wanted}) @@ -230,20 +224,25 @@ async def zip_download(req, keys, zipfile, ext): s = p.stat() size = s.st_size modified = datetime.datetime.fromtimestamp(s.st_mtime, datetime.UTC) + name = rel.as_posix() if p.is_dir(): - yield rel, modified, S_IFDIR | 0o755, ZIP_AUTO(size), b"" + yield f"{name}/", modified, S_IFDIR | 0o755, ZIP_AUTO(size), iter(b"") else: - yield rel, modified, S_IFREG | 0o644, ZIP_AUTO(size), contents(p) + yield name, modified, S_IFREG | 0o644, ZIP_AUTO(size), contents(p, size) - def contents(name): + def contents(name, size): with name.open("rb") as f: - while chunk := f.read(65536): + while size > 0 and (chunk := f.read(min(size, 1 << 20))): + size -= len(chunk) yield chunk + assert size == 0 def worker(): try: - for chunk in stream_zip(local_files(files)): - asyncio.run_coroutine_threadsafe(queue.put(chunk), loop) + with open("test2.zip", "wb") as f: + for chunk in stream_zip(local_files(files)): + f.write(chunk) + asyncio.run_coroutine_threadsafe(queue.put(chunk), loop).result() except Exception: logging.exception("Error streaming ZIP") raise @@ -256,7 +255,10 @@ async def zip_download(req, keys, zipfile, ext): thread = loop.run_in_executor(app.ctx.threadexec, worker) # Stream the response - res = await req.respond(content_type="application/zip") + res = await req.respond( + content_type="application/zip", + headers={"cache-control": "no-store"}, + ) while chunk := await queue.get(): await res.send(chunk) diff --git a/cista/protocol.py b/cista/protocol.py index a949d67..86617f6 100644 --- a/cista/protocol.py +++ b/cista/protocol.py @@ -112,47 +112,36 @@ class ErrorMsg(msgspec.Struct): ## Directory listings -class FileEntry(msgspec.Struct): - key: str - size: int - mtime: int - - -class DirEntry(msgspec.Struct): - key: str - size: int - mtime: int - dir: DirList - - def __getitem__(self, name): - return self.dir[name] - - def __setitem__(self, name, value): - self.dir[name] = value - - def __contains__(self, name): - return name in self.dir - - def __delitem__(self, name): - del self.dir[name] - - @property - def props(self): - return {k: v for k, v in self.__struct_fields__ if k != "dir"} - - -DirList = dict[str, FileEntry | DirEntry] - - -class UpdateEntry(msgspec.Struct, omit_defaults=True): - """Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories.""" - +class FileEntry(msgspec.Struct, array_like=True): + level: int name: str key: str - deleted: bool = False - size: int | None = None - mtime: int | None = None - dir: DirList | None = None + mtime: int + size: int + isfile: int + + +class Update(msgspec.Struct, array_like=True): + ... + + +class UpdKeep(Update, tag="k"): + count: int + + +class UpdDel(Update, tag="d"): + count: int + + +class UpdIns(Update, tag="i"): + items: list[FileEntry] + + +class Space(msgspec.Struct): + disk: int + free: int + usage: int + storage: int def make_dir_data(root): diff --git a/cista/watching.py b/cista/watching.py index 978a056..aa704e5 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -1,20 +1,133 @@ import asyncio import shutil +import stat import sys import threading import time +from os import stat_result from pathlib import Path, PurePosixPath import msgspec +from natsort import humansorted, natsort_keygen, ns from sanic.log import logging from cista import config from cista.fileio import fuid -from cista.protocol import DirEntry, FileEntry, UpdateEntry +from cista.protocol import FileEntry, Space, UpdDel, UpdIns, UpdKeep pubsub = {} -tree = {"": None} -tree_lock = threading.Lock() +sortkey = natsort_keygen(alg=ns.LOCALE) + + +class State: + def __init__(self): + self.lock = threading.RLock() + self._space = Space(0, 0, 0, 0) + self._listing: list[FileEntry] = [] + + @property + def space(self): + with self.lock: + return self._space + + @space.setter + def space(self, space): + with self.lock: + self._space = space + + @property + def root(self) -> list[FileEntry]: + with self.lock: + return self._listing[:] + + @root.setter + def root(self, listing: list[FileEntry]): + with self.lock: + self._listing = listing + + def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]): + relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0) + begin, end = 0, len(self._listing) + level = 0 + isfile = 0 + while level < len(relpath.parts): + # Enter a subdirectory + level += 1 + begin += 1 + if level == len(relpath.parts): + isfile = relfile + name = relpath.parts[level - 1] + namesort = sortkey(name) + r = self._listing[begin] + assert r.level == level + # Iterate over items at this level + while ( + begin < end + and r.name != name + and r.isfile <= isfile + and sortkey(r.name) < namesort + ): + # Skip contents + begin += 1 + while begin < end and self._listing[begin].level > level: + begin += 1 + # Not found? + if begin == end or self._listing[begin].level < level: + return slice(begin, begin) + r = self._listing[begin] + # Not found? + if begin == end or r.name != name: + return slice(begin, begin) + # Found an item, now find its end + for end in range(begin + 1, len(self._listing)): + if self._listing[end].level <= level: + break + return slice(begin, end) + + def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]): + with self.lock: + print(self._slice(index)) + return self._listing[self._slice(index)] + + def __setitem__( + self, index: tuple[PurePosixPath, int], value: list[FileEntry] + ) -> None: + rel, isfile = index + with self.lock: + if rel.parts: + parent = self._slice(rel.parent) + if parent.start == parent.stop: + raise ValueError( + f"Parent folder {rel.as_posix()} is missing for {rel.name}" + ) + self._listing[self._slice(index)] = value + + def __delitem__(self, relpath: PurePosixPath): + with self.lock: + del self._listing[self._slice(relpath)] + + def _index(self, rel: PurePosixPath): + idx = 0 + ret = [] + + def _dir(self, idx: int): + level = self._listing[idx].level + 1 + end = len(self._listing) + idx += 1 + ret = [] + while idx < end and (r := self._listing[idx]).level >= level: + if r.level == level: + ret.append(idx) + return ret, idx + + def update(self, rel: PurePosixPath, value: FileEntry): + begin = 0 + parents = [] + while self._listing[begin].level < len(rel.parts): + parents.append(begin) + + +state = State() rootpath: Path = None # type: ignore quit = False modified_flags = ( @@ -26,23 +139,22 @@ modified_flags = ( "IN_MOVED_FROM", "IN_MOVED_TO", ) -disk_usage = None def watcher_thread(loop): - global disk_usage, rootpath + global rootpath import inotify.adapters while True: rootpath = config.config.path i = inotify.adapters.InotifyTree(rootpath.as_posix()) - old = format_tree() if tree[""] else None - with tree_lock: - # Initialize the tree from filesystem - tree[""] = walk(rootpath) - msg = format_tree() - if msg != old: - asyncio.run_coroutine_threadsafe(broadcast(msg), loop) + # Initialize the tree from filesystem + old, new = state.root, walk() + if old != new: + with state.lock: + state.root = new + msg = format_tree(new) + asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result() # The watching is not entirely reliable, so do a full refresh every minute refreshdl = time.monotonic() + 60.0 @@ -52,9 +164,12 @@ def watcher_thread(loop): return # Disk usage update du = shutil.disk_usage(rootpath) - if du != disk_usage: - disk_usage = du - asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop) + space = Space(*du, storage=state.root[0].size) + if space != state.space: + state.space = space + asyncio.run_coroutine_threadsafe( + broadcast(format_du(space)), loop + ).result() break # Do a full refresh? if time.monotonic() > refreshdl: @@ -75,141 +190,136 @@ def watcher_thread(loop): def watcher_thread_poll(loop): - global disk_usage, rootpath + global rootpath while not quit: rootpath = config.config.path - old = format_tree() if tree[""] else None - with tree_lock: - # Initialize the tree from filesystem - tree[""] = walk(rootpath) - msg = format_tree() - if msg != old: - asyncio.run_coroutine_threadsafe(broadcast(msg), loop) + old = state.root + new = walk() + if old != new: + state.root = new + asyncio.run_coroutine_threadsafe(broadcast(format_tree(new)), loop).result() # Disk usage update du = shutil.disk_usage(rootpath) - if du != disk_usage: - disk_usage = du - asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop) + space = Space(*du, storage=state.root[0].size) + if space != state.space: + state.space = space + asyncio.run_coroutine_threadsafe(broadcast(format_du(space)), loop).result() - time.sleep(1.0) + time.sleep(2.0) -def format_du(): - return msgspec.json.encode( - { - "space": { - "disk": disk_usage.total, - "used": disk_usage.used, - "free": disk_usage.free, - "storage": tree[""].size, - }, - }, - ).decode() +def format_du(usage): + return msgspec.json.encode({"space": usage}).decode() -def format_tree(): - root = tree[""] +def format_tree(root): return msgspec.json.encode({"root": root}).decode() -def walk(path: Path) -> DirEntry | FileEntry | None: +def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008 + path = rootpath / rel try: - s = path.stat() - key = fuid(s) - assert key, repr(key) - mtime = int(s.st_mtime) - if path.is_file(): - return FileEntry(key, s.st_size, mtime) + st = path.stat() + except OSError: + return [] + return _walk(rel, int(not stat.S_ISDIR(st.st_mode)), st) - tree = { - p.name: v - for p in path.iterdir() - if not p.name.startswith(".") - if (v := walk(p)) is not None - } - if tree: - size = sum(v.size for v in tree.values()) - mtime = max(mtime, *(v.mtime for v in tree.values())) - else: - size = 0 - return DirEntry(key, size, mtime, tree) + +def _walk(rel: PurePosixPath, isfile: int, st: stat_result) -> list[FileEntry]: + entry = FileEntry( + level=len(rel.parts), + name=rel.name, + key=fuid(st), + mtime=int(st.st_mtime), + size=st.st_size if isfile else 0, + isfile=isfile, + ) + if isfile: + return [entry] + ret = [entry] + path = rootpath / rel + try: + li = [] + for f in path.iterdir(): + if f.name.startswith("."): + continue # No dotfiles + s = f.stat() + li.append((int(not stat.S_ISDIR(s.st_mode)), f.name, s)) + for [isfile, name, s] in humansorted(li): + subtree = _walk(rel / name, isfile, s) + child = subtree[0] + entry.mtime = max(entry.mtime, child.mtime) + entry.size += child.size + ret.extend(subtree) except FileNotFoundError: - return None + pass # Things may be rapidly in motion except OSError as e: print("OS error walking path", path, e) - return None + return ret def update(relpath: PurePosixPath, loop): """Called by inotify updates, check the filesystem and broadcast any changes.""" if rootpath is None or relpath is None: print("ERROR", rootpath, relpath) - new = walk(rootpath / relpath) - with tree_lock: - update = update_internal(relpath, new) - if not update: - return # No changes - msg = msgspec.json.encode({"update": update}).decode() - asyncio.run_coroutine_threadsafe(broadcast(msg), loop) - - -def update_internal( - relpath: PurePosixPath, - new: DirEntry | FileEntry | None, -) -> list[UpdateEntry]: - path = "", *relpath.parts - old = tree - elems = [] - for name in path: - if name not in old: - # File or folder created - old = None - elems.append((name, None)) - if len(elems) < len(path): - # We got a notify for an item whose parent is not in tree - print("Tree out of sync DEBUG", relpath) - print(elems) - print("Current tree:") - print(tree[""]) - print("Walking all:") - print(walk(rootpath)) - raise ValueError("Tree out of sync") - break - old = old[name] - elems.append((name, old)) + new = walk(relpath) + old = state[relpath] if old == new: - return [] - mt = new.mtime if new else 0 - szdiff = (new.size if new else 0) - (old.size if old else 0) - # Update parents - update = [] - for name, entry in elems[:-1]: - u = UpdateEntry(name, entry.key) - if szdiff: - entry.size += szdiff - u.size = entry.size - if mt > entry.mtime: - u.mtime = entry.mtime = mt - update.append(u) - # The last element is the one that changed - name, entry = elems[-1] - parent = elems[-2][1] if len(elems) > 1 else tree - u = UpdateEntry(name, new.key if new else entry.key) + return + old = state.root if new: - parent[name] = new - if u.size != new.size: - u.size = new.size - if u.mtime != new.mtime: - u.mtime = new.mtime - if isinstance(new, DirEntry) and u.dir != new.dir: - u.dir = new.dir + state[relpath, new[0].isfile] = new else: - del parent[name] - u.deleted = True - update.append(u) - return update + del state[relpath] + # FIXME: broadcast format_update() + msg = format_update(old, state.root) + asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result() + + +def format_update(old, new): + # Make keep/del/insert diff until one of the lists ends + oidx, nidx = 0, 0 + update = [] + keep_count = 0 + while oidx < len(old) and nidx < len(new): + if old[oidx] == new[nidx]: + keep_count += 1 + oidx += 1 + nidx += 1 + continue + if keep_count > 0: + update.append(UpdKeep(keep_count)) + keep_count = 0 + + del_count = 0 + rest = new[nidx:] + while old[oidx] not in rest: + del_count += 1 + oidx += 1 + + if del_count: + update.append(UpdDel(del_count)) + oidx += 1 + continue + + insert_items = [] + rest = old[oidx:] + while nidx < len(new) and new[nidx] not in rest: + insert_items.append(new[nidx]) + nidx += 1 + update.append(UpdIns(insert_items)) + + # Diff any remaining + if keep_count > 0: + update.append(UpdKeep(keep_count)) + if oidx < len(old): + update.append(UpdDel(len(old) - oidx)) + elif nidx < len(new): + update.append(UpdIns(new[nidx:])) + + return msgspec.json.encode({"update": update}).decode() async def broadcast(msg):