import asyncio import shutil import threading import time from pathlib import Path, PurePosixPath import inotify.adapters import msgspec from cista import config from cista.fileio import fuid from cista.protocol import DirEntry, FileEntry, UpdateEntry pubsub = {} tree = {"": None} tree_lock = threading.Lock() rootpath: Path = None # type: ignore quit = False modified_flags = ( "IN_CREATE", "IN_DELETE", "IN_DELETE_SELF", "IN_MODIFY", "IN_MOVE_SELF", "IN_MOVED_FROM", "IN_MOVED_TO", ) disk_usage = None def watcher_thread(loop): global disk_usage while True: rootpath = config.config.path i = inotify.adapters.InotifyTree(rootpath.as_posix()) old = format_tree() if tree[""] else None with tree_lock: # Initialize the tree from filesystem tree[""] = walk(rootpath) msg = format_tree() if msg != old: asyncio.run_coroutine_threadsafe(broadcast(msg), loop) # The watching is not entirely reliable, so do a full refresh every minute refreshdl = time.monotonic() + 60.0 for event in i.event_gen(): if quit: return # Disk usage update du = shutil.disk_usage(rootpath) if du != disk_usage: disk_usage = du asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop) break # Do a full refresh? if time.monotonic() > refreshdl: break if event is None: continue _, flags, path, filename = event if not any(f in modified_flags for f in flags): continue # Update modified path path = PurePosixPath(path) / filename try: update(path.relative_to(rootpath), loop) except Exception as e: print("Watching error", e) break i = None # Free the inotify object def format_du(): return msgspec.json.encode( { "space": { "disk": disk_usage.total, "used": disk_usage.used, "free": disk_usage.free, "storage": tree[""].size, }, }, ).decode() def format_tree(): root = tree[""] return msgspec.json.encode( { "update": [ UpdateEntry(id=root.id, size=root.size, mtime=root.mtime, dir=root.dir), ], }, ).decode() def walk(path: Path) -> DirEntry | FileEntry | None: try: s = path.stat() id_ = fuid(s) mtime = int(s.st_mtime) if path.is_file(): return FileEntry(id_, s.st_size, mtime) tree = { p.name: v for p in path.iterdir() if not p.name.startswith(".") if (v := walk(p)) is not None } if tree: size = sum(v.size for v in tree.values()) mtime = max(mtime, *(v.mtime for v in tree.values())) else: size = 0 return DirEntry(id_, size, mtime, tree) except FileNotFoundError: return None except OSError as e: print("OS error walking path", path, e) return None def update(relpath: PurePosixPath, loop): """Called by inotify updates, check the filesystem and broadcast any changes.""" new = walk(rootpath / relpath) with tree_lock: update = update_internal(relpath, new) if not update: return # No changes msg = msgspec.json.encode({"update": update}).decode() asyncio.run_coroutine_threadsafe(broadcast(msg), loop) def update_internal( relpath: PurePosixPath, new: DirEntry | FileEntry | None, ) -> list[UpdateEntry]: path = "", *relpath.parts old = tree elems = [] for name in path: if name not in old: # File or folder created old = None elems.append((name, None)) if len(elems) < len(path): # We got a notify for an item whose parent is not in tree print("Tree out of sync DEBUG", relpath) print(elems) print("Current tree:") print(tree[""]) print("Walking all:") print(walk(rootpath)) raise ValueError("Tree out of sync") break old = old[name] elems.append((name, old)) if old == new: return [] mt = new.mtime if new else 0 szdiff = (new.size if new else 0) - (old.size if old else 0) # Update parents update = [] for name, entry in elems[:-1]: u = UpdateEntry(name) if szdiff: entry.size += szdiff u.size = entry.size if mt > entry.mtime: u.mtime = entry.mtime = mt update.append(u) # The last element is the one that changed name, entry = elems[-1] parent = elems[-2][1] if len(elems) > 1 else tree u = UpdateEntry(name) if new: parent[name] = new if u.size != new.size: u.size = new.size if u.mtime != new.mtime: u.mtime = new.mtime if isinstance(new, DirEntry) and u.dir == new.dir: u.dir = new.dir else: del parent[name] u.deleted = True update.append(u) return update async def broadcast(msg): for queue in pubsub.values(): await queue.put_nowait(msg) async def start(app, loop): config.load_config() app.ctx.watcher = threading.Thread(target=watcher_thread, args=[loop]) app.ctx.watcher.start() async def stop(app, loop): global quit quit = True app.ctx.watcher.join()