From 4c51029c9fedef275b5fa0334efb9e05bc08bb7c Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Tue, 7 Nov 2023 20:15:09 +0000 Subject: [PATCH] A number of bugfixed on watching, which now works much better. --- cista-front/src/repositories/WS.ts | 3 ++- cista/api.py | 6 ++++-- cista/protocol.py | 6 +++--- cista/watching.py | 24 ++++++++++++++++-------- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/cista-front/src/repositories/WS.ts b/cista-front/src/repositories/WS.ts index c8d9db3..b760e17 100644 --- a/cista-front/src/repositories/WS.ts +++ b/cista-front/src/repositories/WS.ts @@ -110,8 +110,9 @@ function handleUpdateMessage(updateData: { update: UpdateEntry[] }) { delete node.dir[elem.name] break // Deleted elements can't have further children } - if (elem.name !== undefined) { + if (elem.name) { // @ts-ignore + console.log(node, elem.name) node = node.dir[elem.name] ||= {} } if (elem.key !== undefined) node.key = elem.key diff --git a/cista/api.py b/cista/api.py index f8d7fee..d7d17b8 100644 --- a/cista/api.py +++ b/cista/api.py @@ -1,5 +1,6 @@ import asyncio import typing +from secrets import token_bytes import msgspec from sanic import Blueprint @@ -100,9 +101,10 @@ async def watch(req, ws): } ).decode() ) + uuid = token_bytes(16) try: with watching.tree_lock: - q = watching.pubsub[ws] = asyncio.Queue() + q = watching.pubsub[uuid] = asyncio.Queue() # Init with disk usage and full tree await ws.send(watching.format_du()) await ws.send(watching.format_tree()) @@ -110,4 +112,4 @@ async def watch(req, ws): while True: await ws.send(await q.get()) finally: - del watching.pubsub[ws] + del watching.pubsub[uuid] diff --git a/cista/protocol.py b/cista/protocol.py index 02a35d9..131b0e0 100644 --- a/cista/protocol.py +++ b/cista/protocol.py @@ -45,7 +45,7 @@ class Rm(ControlBase): sel = [root / filename.sanitize(p) for p in self.sel] for p in sel: if p.is_dir(): - shutil.rmtree(p, ignore_errors=True) + shutil.rmtree(p) else: p.unlink() @@ -147,9 +147,9 @@ DirList = dict[str, FileEntry | DirEntry] class UpdateEntry(msgspec.Struct, omit_defaults=True): """Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories.""" - name: str = "" + name: str + key: str deleted: bool = False - key: str | None = None size: int | None = None mtime: int | None = None dir: DirList | None = None diff --git a/cista/watching.py b/cista/watching.py index df98116..f268b5f 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -6,6 +6,7 @@ from pathlib import Path, PurePosixPath import inotify.adapters import msgspec +from sanic.log import logging from cista import config from cista.fileio import fuid @@ -29,7 +30,7 @@ disk_usage = None def watcher_thread(loop): - global disk_usage + global disk_usage, rootpath while True: rootpath = config.config.path @@ -38,6 +39,7 @@ def watcher_thread(loop): with tree_lock: # Initialize the tree from filesystem tree[""] = walk(rootpath) + print(" ".join(tree[""].dir.keys())) msg = format_tree() if msg != old: asyncio.run_coroutine_threadsafe(broadcast(msg), loop) @@ -67,8 +69,8 @@ def watcher_thread(loop): try: update(path.relative_to(rootpath), loop) except Exception as e: - print("Watching error", e) - break + print("Watching error", e, path, rootpath) + raise i = None # Free the inotify object @@ -120,6 +122,8 @@ def walk(path: Path) -> DirEntry | FileEntry | None: def update(relpath: PurePosixPath, loop): """Called by inotify updates, check the filesystem and broadcast any changes.""" + if rootpath is None or relpath is None: + print("ERROR", rootpath, relpath) new = walk(rootpath / relpath) with tree_lock: update = update_internal(relpath, new) @@ -160,7 +164,7 @@ def update_internal( # Update parents update = [] for name, entry in elems[:-1]: - u = UpdateEntry(name) + u = UpdateEntry(name, entry.key) if szdiff: entry.size += szdiff u.size = entry.size @@ -170,14 +174,14 @@ def update_internal( # The last element is the one that changed name, entry = elems[-1] parent = elems[-2][1] if len(elems) > 1 else tree - u = UpdateEntry(name) + u = UpdateEntry(name, new.key if new else entry.key) if new: parent[name] = new if u.size != new.size: u.size = new.size if u.mtime != new.mtime: u.mtime = new.mtime - if isinstance(new, DirEntry) and u.dir == new.dir: + if isinstance(new, DirEntry) and u.dir != new.dir: u.dir = new.dir else: del parent[name] @@ -187,8 +191,12 @@ def update_internal( async def broadcast(msg): - for queue in pubsub.values(): - await queue.put_nowait(msg) + try: + for queue in pubsub.values(): + queue.put_nowait(msg) + except Exception: + # Log because asyncio would silently eat the error + logging.exception("Broadcast error") async def start(app, loop):