import asyncio import shutil import stat import sys import threading import time from os import stat_result from pathlib import Path, PurePosixPath import msgspec from natsort import humansorted, natsort_keygen, ns from sanic.log import logging from cista import config from cista.fileio import fuid from cista.protocol import FileEntry, Space, UpdDel, UpdIns, UpdKeep pubsub = {} sortkey = natsort_keygen(alg=ns.LOCALE) class State: def __init__(self): self.lock = threading.RLock() self._space = Space(0, 0, 0, 0) self._listing: list[FileEntry] = [] @property def space(self): with self.lock: return self._space @space.setter def space(self, space): with self.lock: self._space = space @property def root(self) -> list[FileEntry]: with self.lock: return self._listing[:] @root.setter def root(self, listing: list[FileEntry]): with self.lock: self._listing = listing def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]): relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0) begin, end = 0, len(self._listing) level = 0 isfile = 0 while level < len(relpath.parts): # Enter a subdirectory level += 1 begin += 1 if level == len(relpath.parts): isfile = relfile name = relpath.parts[level - 1] namesort = sortkey(name) r = self._listing[begin] assert r.level == level # Iterate over items at this level while ( begin < end and r.name != name and r.isfile <= isfile and sortkey(r.name) < namesort ): # Skip contents begin += 1 while begin < end and self._listing[begin].level > level: begin += 1 # Not found? if begin == end or self._listing[begin].level < level: return slice(begin, begin) r = self._listing[begin] # Not found? if begin == end or r.name != name: return slice(begin, begin) # Found an item, now find its end for end in range(begin + 1, len(self._listing)): if self._listing[end].level <= level: break return slice(begin, end) def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]): with self.lock: return self._listing[self._slice(index)] def __setitem__( self, index: tuple[PurePosixPath, int], value: list[FileEntry] ) -> None: rel, isfile = index with self.lock: if rel.parts: parent = self._slice(rel.parent) if parent.start == parent.stop: raise ValueError( f"Parent folder {rel.as_posix()} is missing for {rel.name}" ) self._listing[self._slice(index)] = value def __delitem__(self, relpath: PurePosixPath): with self.lock: del self._listing[self._slice(relpath)] def _index(self, rel: PurePosixPath): idx = 0 ret = [] def _dir(self, idx: int): level = self._listing[idx].level + 1 end = len(self._listing) idx += 1 ret = [] while idx < end and (r := self._listing[idx]).level >= level: if r.level == level: ret.append(idx) return ret, idx def update(self, rel: PurePosixPath, value: FileEntry): begin = 0 parents = [] while self._listing[begin].level < len(rel.parts): parents.append(begin) state = State() rootpath: Path = None # type: ignore quit = False modified_flags = ( "IN_CREATE", "IN_DELETE", "IN_DELETE_SELF", "IN_MODIFY", "IN_MOVE_SELF", "IN_MOVED_FROM", "IN_MOVED_TO", ) def watcher_thread(loop): global rootpath import inotify.adapters while True: rootpath = config.config.path i = inotify.adapters.InotifyTree(rootpath.as_posix()) # Initialize the tree from filesystem new = walk() with state.lock: old = state.root if old != new: state.root = new broadcast(format_update(old, new), loop) # The watching is not entirely reliable, so do a full refresh every minute refreshdl = time.monotonic() + 60.0 for event in i.event_gen(): if quit: return # Disk usage update du = shutil.disk_usage(rootpath) space = Space(*du, storage=state.root[0].size) if space != state.space: state.space = space broadcast(format_space(space), loop) break # Do a full refresh? if time.monotonic() > refreshdl: break if event is None: continue _, flags, path, filename = event if not any(f in modified_flags for f in flags): continue # Update modified path path = PurePosixPath(path) / filename try: update(path.relative_to(rootpath), loop) except Exception as e: print("Watching error", e, path, rootpath) raise i = None # Free the inotify object def watcher_thread_poll(loop): global rootpath while not quit: rootpath = config.config.path new = walk() with state.lock: old = state.root if old != new: state.root = new broadcast(format_update(old, new), loop) # Disk usage update du = shutil.disk_usage(rootpath) space = Space(*du, storage=state.root[0].size) if space != state.space: state.space = space broadcast(format_space(space), loop) time.sleep(2.0) def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008 path = rootpath / rel try: st = path.stat() except OSError: return [] return _walk(rel, int(not stat.S_ISDIR(st.st_mode)), st) def _walk(rel: PurePosixPath, isfile: int, st: stat_result) -> list[FileEntry]: entry = FileEntry( level=len(rel.parts), name=rel.name, key=fuid(st), mtime=int(st.st_mtime), size=st.st_size if isfile else 0, isfile=isfile, ) if isfile: return [entry] ret = [entry] path = rootpath / rel try: li = [] for f in path.iterdir(): if f.name.startswith("."): continue # No dotfiles s = f.stat() li.append((int(not stat.S_ISDIR(s.st_mode)), f.name, s)) for [isfile, name, s] in humansorted(li): subtree = _walk(rel / name, isfile, s) child = subtree[0] entry.mtime = max(entry.mtime, child.mtime) entry.size += child.size ret.extend(subtree) except FileNotFoundError: pass # Things may be rapidly in motion except OSError as e: print("OS error walking path", path, e) return ret def update(relpath: PurePosixPath, loop): """Called by inotify updates, check the filesystem and broadcast any changes.""" if rootpath is None or relpath is None: print("ERROR", rootpath, relpath) new = walk(relpath) with state.lock: old = state[relpath] if old == new: return old = state.root if new: state[relpath, new[0].isfile] = new else: del state[relpath] broadcast(format_update(old, state.root), loop) def format_update(old, new): # Make keep/del/insert diff until one of the lists ends oidx, nidx = 0, 0 update = [] keep_count = 0 while oidx < len(old) and nidx < len(new): if old[oidx] == new[nidx]: keep_count += 1 oidx += 1 nidx += 1 continue if keep_count > 0: update.append(UpdKeep(keep_count)) keep_count = 0 del_count = 0 rest = new[nidx:] while oidx < len(old) and old[oidx] not in rest: del_count += 1 oidx += 1 if del_count: update.append(UpdDel(del_count)) continue insert_items = [] rest = old[oidx:] while nidx < len(new) and new[nidx] not in rest: insert_items.append(new[nidx]) nidx += 1 update.append(UpdIns(insert_items)) # Diff any remaining if keep_count > 0: update.append(UpdKeep(keep_count)) if oidx < len(old): update.append(UpdDel(len(old) - oidx)) elif nidx < len(new): update.append(UpdIns(new[nidx:])) return msgspec.json.encode({"update": update}).decode() def format_space(usage): return msgspec.json.encode({"space": usage}).decode() def format_root(root): return msgspec.json.encode({"root": root}).decode() def broadcast(msg, loop): return asyncio.run_coroutine_threadsafe(abroadcast(msg), loop).result() async def abroadcast(msg): try: for queue in pubsub.values(): queue.put_nowait(msg) except Exception: # Log because asyncio would silently eat the error logging.exception("Broadcast error") async def start(app, loop): config.load_config() use_inotify = sys.platform == "linux" app.ctx.watcher = threading.Thread( target=watcher_thread if use_inotify else watcher_thread_poll, args=[loop], ) app.ctx.watcher.start() async def stop(app, loop): global quit quit = True app.ctx.watcher.join()