diff --git a/cista/watching.py b/cista/watching.py index 5ead7b3..f5e1ed3 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -1,11 +1,12 @@ import asyncio import shutil -import stat import sys import threading import time +from contextlib import suppress from os import stat_result from pathlib import Path, PurePosixPath +from stat import S_ISDIR import msgspec from natsort import humansorted, natsort_keygen, ns @@ -115,135 +116,62 @@ state = State() rootpath: Path = None # type: ignore quit = threading.Event() -modified_flags = ( - "IN_CREATE", - "IN_DELETE", - "IN_DELETE_SELF", - "IN_MODIFY", - "IN_MOVE_SELF", - "IN_MOVED_FROM", - "IN_MOVED_TO", -) +## Filesystem scanning -def watcher_thread(loop): - global rootpath - import inotify.adapters - - while not quit.is_set(): - rootpath = config.config.path - i = inotify.adapters.InotifyTree(rootpath.as_posix()) - # Initialize the tree from filesystem - new = walk() - with state.lock: - old = state.root - if old != new: - state.root = new - broadcast(format_update(old, new), loop) - - # The watching is not entirely reliable, so do a full refresh every 30 seconds - refreshdl = time.monotonic() + 30.0 - - for event in i.event_gen(): - if quit.is_set(): - return - # Disk usage update - du = shutil.disk_usage(rootpath) - space = Space(*du, storage=state.root[0].size) - if space != state.space: - state.space = space - broadcast(format_space(space), loop) - break - # Do a full refresh? - if time.monotonic() > refreshdl: - break - if event is None: - continue - _, flags, path, filename = event - if not any(f in modified_flags for f in flags): - continue - # Update modified path - path = PurePosixPath(path) / filename - try: - update(path.relative_to(rootpath), loop) - except Exception as e: - print("Watching error", e, path, rootpath) - raise - i = None # Free the inotify object - - -def watcher_thread_poll(loop): - global rootpath - - while not quit.is_set(): - rootpath = config.config.path - new = walk() - with state.lock: - old = state.root - if old != new: - state.root = new - broadcast(format_update(old, new), loop) - - # Disk usage update - du = shutil.disk_usage(rootpath) - space = Space(*du, storage=state.root[0].size) - if space != state.space: - state.space = space - broadcast(format_space(space), loop) - - quit.wait(2.0) - - -def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008 - path = rootpath / rel - try: - st = path.stat() - except OSError: - return [] - return _walk(rel, int(not stat.S_ISDIR(st.st_mode)), st) - - -def _walk(rel: PurePosixPath, isfile: int, st: stat_result) -> list[FileEntry]: - entry = FileEntry( - level=len(rel.parts), - name=rel.name, - key=fuid(st), - mtime=int(st.st_mtime), - size=st.st_size if isfile else 0, - isfile=isfile, - ) - if isfile: - return [entry] - ret = [entry] +def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry]: path = rootpath / rel + ret = [] try: + st = stat or path.stat() + isfile = not S_ISDIR(st.st_mode) + entry = FileEntry( + level=len(rel.parts), + name=rel.name, + key=fuid(st), + mtime=int(st.st_mtime), + size=st.st_size if isfile else 0, + isfile=isfile, + ) + if isfile: + return [entry] + # Walk all entries of the directory + ret = [entry] li = [] for f in path.iterdir(): if quit.is_set(): raise SystemExit("quit") if f.name.startswith("."): continue # No dotfiles - s = f.stat() - li.append((int(not stat.S_ISDIR(s.st_mode)), f.name, s)) - for [isfile, name, s] in humansorted(li): - if quit.is_set(): - raise SystemExit("quit") - subtree = _walk(rel / name, isfile, s) - child = subtree[0] + with suppress(FileNotFoundError): + s = f.stat() + li.append((int(not S_ISDIR(s.st_mode)), f.name, s)) + # Build the tree as a list of FileEntries + for [_, name, s] in humansorted(li): + sub = walk(rel / name, stat=s) + ret.extend(sub) + child = sub[0] entry.mtime = max(entry.mtime, child.mtime) entry.size += child.size - ret.extend(subtree) except FileNotFoundError: pass # Things may be rapidly in motion - except OSError as e: - print("OS error walking path", path, e) + except OSError: + logger.error(f"Watching {path=}: {e!r}") return ret -def update(relpath: PurePosixPath, loop): - """Called by inotify updates, check the filesystem and broadcast any changes.""" - if rootpath is None or relpath is None: - print("ERROR", rootpath, relpath) +def update_root(loop): + """Full filesystem scan""" + new = walk(PurePosixPath()) + with state.lock: + old = state.root + if old != new: + state.root = new + broadcast(format_update(old, new), loop) + + +def update_path(relpath: PurePosixPath, loop): + """Called on FS updates, check the filesystem and broadcast any changes.""" new = walk(relpath) with state.lock: old = state[relpath] @@ -257,6 +185,22 @@ def update(relpath: PurePosixPath, loop): broadcast(format_update(old, state.root), loop) +def update_space(loop): + """Called periodically to update the disk usage.""" + du = shutil.disk_usage(rootpath) + space = Space(*du, storage=state.root[0].size) + # Update only on difference above 1 MB + tol = 10**6 + old = msgspec.structs.astuple(state.space) + new = msgspec.structs.astuple(space) + if any(abs(o - n) > tol for o, n in zip(old, new, strict=True)): + state.space = space + broadcast(format_space(space), loop) + + +## Messaging + + def format_update(old, new): # Make keep/del/insert diff until one of the lists ends oidx, nidx = 0, 0 @@ -320,18 +264,70 @@ async def abroadcast(msg): logger.exception("Broadcast error") +## Watcher thread + + +def watcher_inotify(loop): + """Inotify watcher thread (Linux only)""" + import inotify.adapters + + modified_flags = ( + "IN_CREATE", + "IN_DELETE", + "IN_DELETE_SELF", + "IN_MODIFY", + "IN_MOVE_SELF", + "IN_MOVED_FROM", + "IN_MOVED_TO", + ) + while not quit.is_set(): + i = inotify.adapters.InotifyTree(rootpath.as_posix()) + # Initialize the tree from filesystem + update_root(loop) + trefresh = time.monotonic() + 30.0 + tspace = time.monotonic() + 5.0 + # Watch for changes (frequent wakeups needed for quiting) + for event in i.event_gen(timeout_s=0.1): + if quit.is_set(): + break + t = time.monotonic() + # The watching is not entirely reliable, so do a full refresh every 30 seconds + if t >= trefresh: + break + # Disk usage update + if t >= tspace: + tspace = time.monotonic() + 5.0 + update_space(loop) + # Inotify event, update the tree + if event and any(f in modified_flags for f in event[1]): + # Update modified path + update_path(PurePosixPath(event[2]) / event[3], loop) + + del i # Free the inotify object + + +def watcher_poll(loop): + """Polling version of the watcher thread.""" + while not quit.is_set(): + update_root(loop) + update_space(loop) + quit.wait(2.0) + + async def start(app, loop): + global rootpath config.load_config() + rootpath = config.config.path use_inotify = sys.platform == "linux" app.ctx.watcher = threading.Thread( - target=watcher_thread if use_inotify else watcher_thread_poll, + target=watcher_inotify if use_inotify else watcher_poll, args=[loop], - name="watcher", + # Descriptive name for system monitoring + name=f"cista-watcher {rootpath}", ) app.ctx.watcher.start() async def stop(app, loop): - global quit quit.set() app.ctx.watcher.join()