From 228b75a20dc68d0049592c5a2a0047236a754a6a Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Thu, 19 Oct 2023 20:54:33 +0300 Subject: [PATCH] Watching cleanup, debugging a problem case. --- cista/watching.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/cista/watching.py b/cista/watching.py index 5e7a549..4457f9b 100755 --- a/cista/watching.py +++ b/cista/watching.py @@ -10,7 +10,6 @@ from watchdog.observers import Observer from . import config from .protocol import DirEntry, FileEntry, UpdateEntry -secret = secrets.token_bytes(8) pubsub = {} def walk(path: Path) -> DirEntry | FileEntry | None: @@ -33,7 +32,7 @@ def walk(path: Path) -> DirEntry | FileEntry | None: print("OS error walking path", path, e) return None -tree = None +tree = {"": None} tree_lock = threading.Lock() rootpath = None @@ -43,14 +42,17 @@ def refresh(): UpdateEntry(size=root.size, mtime=root.mtime, dir=root.dir) ]}).decode() -def update(relpath: PurePosixPath, loop): +def update(relpath: Path, loop): + """Called by inotify updates, check the filesystem and broadcast any changes.""" new = walk(rootpath / relpath) with tree_lock: - msg = update_internal(relpath, new) + update = update_internal(relpath, new) + if not update: return # No changes + msg = msgspec.json.encode({"update": update}).decode() print(msg) asyncio.run_coroutine_threadsafe(broadcast(msg), loop) -def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None): +def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> list[UpdateEntry]: path = "", *relpath.parts old = tree elems = [] @@ -60,12 +62,19 @@ def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None): old = None elems.append((name, None)) if len(elems) < len(path): + # We got a notify for an item whose parent is not in tree + print("Tree out of sync DEBUG", relpath) + print(elems) + print("Current tree:") + print(tree[""]) + print("Walking all:") + print(walk(rootpath)) raise ValueError("Tree out of sync") break old = old[name] elems.append((name, old)) if old == new: - return # No changes + return [] mt = new.mtime if new else 0 szdiff = (new.size if new else 0) - (old.size if old else 0) # Update parents @@ -93,7 +102,7 @@ def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None): del parent[name] u.deleted = True update.append(u) - return msgspec.json.encode({"update": update}).decode() + return update async def broadcast(msg): for queue in pubsub.values(): @@ -102,11 +111,11 @@ async def broadcast(msg): def register(app, url): @app.before_server_start async def start_watcher(app, loop): - global tree, rootpath + global rootpath config.load_config() + # Initialize the tree from filesystem rootpath = config.config.path - # Pseudo nameless root entry to ease updates - tree = {"": walk(rootpath)} + tree[""] = walk(rootpath) class Handler(FileSystemEventHandler): def on_any_event(self, event): update(Path(event.src_path).relative_to(rootpath), loop)