From 9939cb33fac1472bf28d2e65539895387a54f237 Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Thu, 19 Oct 2023 23:52:37 +0300 Subject: [PATCH] Watcher rewritten with inotify module, bugs fixed. --- cista/__main__.py | 2 +- cista/static/index.html | 2 ++ cista/watching.py | 60 ++++++++++++++++++++++++++++------------- pyproject.toml | 2 +- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/cista/__main__.py b/cista/__main__.py index cb2c3ce..7054c11 100755 --- a/cista/__main__.py +++ b/cista/__main__.py @@ -52,7 +52,7 @@ def _main(): listen = args["-l"] # Validate arguments first if args[""]: - path = Path(args[""]) + path = Path(args[""]).resolve() if not path.is_dir(): raise ValueError(f"No such directory: {path}") else: diff --git a/cista/static/index.html b/cista/static/index.html index 3e76385..db336cb 100755 --- a/cista/static/index.html +++ b/cista/static/index.html @@ -54,9 +54,11 @@ function createWatchSocket() { createWatchSocket() function tree_update(msg) { + console.log("Tree update", msg) let node = files for (const elem of msg) { if (elem.deleted) { + const p = node.dir[elem.name].path delete node.dir[elem.name] delete flatfiles[p] break diff --git a/cista/watching.py b/cista/watching.py index 4457f9b..85ab984 100755 --- a/cista/watching.py +++ b/cista/watching.py @@ -1,16 +1,47 @@ import asyncio -import secrets import threading +import time from pathlib import Path, PurePosixPath +from socket import timeout +import inotify.adapters import msgspec -from watchdog.events import FileSystemEventHandler -from watchdog.observers import Observer from . import config from .protocol import DirEntry, FileEntry, UpdateEntry pubsub = {} +tree = {"": None} +tree_lock = threading.Lock() +rootpath = None +quit = False +modified_flags = "IN_CREATE", "IN_DELETE", "IN_DELETE_SELF", "IN_MODIFY", "IN_MOVE_SELF", "IN_MOVED_FROM", "IN_MOVED_TO" + +def watcher_thread(loop): + while True: + i = inotify.adapters.InotifyTree(rootpath.as_posix()) + old = refresh() if tree[""] else None + with tree_lock: + # Initialize the tree from filesystem + tree[""] = walk(rootpath) + msg = refresh() + if msg != old: + asyncio.run_coroutine_threadsafe(broadcast(msg), loop) + + # The watching is not entirely reliable, so do a full refresh every minute + refreshdl = time.monotonic() + 60.0 + + for event in i.event_gen(): + if quit: return + if time.monotonic() > refreshdl: break + if event is None: continue + _, flags, path, filename = event + if not any(f in modified_flags for f in flags): + continue + path = PurePosixPath(path) / filename + #print(path, flags) + update(path.relative_to(rootpath), loop) + i = None # Free the inotify object def walk(path: Path) -> DirEntry | FileEntry | None: try: @@ -32,10 +63,6 @@ def walk(path: Path) -> DirEntry | FileEntry | None: print("OS error walking path", path, e) return None -tree = {"": None} -tree_lock = threading.Lock() -rootpath = None - def refresh(): root = tree[""] return msgspec.json.encode({"update": [ @@ -49,7 +76,6 @@ def update(relpath: Path, loop): update = update_internal(relpath, new) if not update: return # No changes msg = msgspec.json.encode({"update": update}).decode() - print(msg) asyncio.run_coroutine_threadsafe(broadcast(msg), loop) def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> list[UpdateEntry]: @@ -88,7 +114,6 @@ def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> u.mtime = entry.mtime = mt update.append(u) # The last element is the one that changed - print([e[0] for e in elems]) name, entry = elems[-1] parent = elems[-2][1] if len(elems) > 1 else tree u = UpdateEntry(name) @@ -113,27 +138,24 @@ def register(app, url): async def start_watcher(app, loop): global rootpath config.load_config() - # Initialize the tree from filesystem rootpath = config.config.path - tree[""] = walk(rootpath) - class Handler(FileSystemEventHandler): - def on_any_event(self, event): - update(Path(event.src_path).relative_to(rootpath), loop) - app.ctx.observer = Observer() - app.ctx.observer.schedule(Handler(), str(rootpath), recursive=True) - app.ctx.observer.start() + app.ctx.watcher = threading.Thread(target=watcher_thread, args=[loop]) + app.ctx.watcher.start() @app.after_server_stop async def stop_watcher(app, _): - app.ctx.observer.stop() - app.ctx.observer.join() + global quit + quit = True + app.ctx.watcher.join() @app.websocket(url) async def watch(request, ws): try: with tree_lock: q = pubsub[ws] = asyncio.Queue() + # Init with full tree await ws.send(refresh()) + # Send updates while True: await ws.send(await q.get()) finally: diff --git a/pyproject.toml b/pyproject.toml index 2927be2..1b96717 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,12 +16,12 @@ classifiers = [ dependencies = [ "argon2-cffi", "docopt", + "inotify", "msgspec", "pathvalidate", "pyjwt", "sanic", "tomli_w", - "watchdog", ] [project.urls]