import asyncio import os import shutil import sys import threading import time from contextlib import suppress from os import stat_result from pathlib import Path, PurePosixPath from stat import S_ISDIR, S_ISREG import signal import msgspec from natsort import humansorted, natsort_keygen, ns from sanic.log import logger from cista import config from cista.fileio import fuid from cista.protocol import FileEntry, Space, UpdDel, UpdIns, UpdKeep pubsub = {} sortkey = natsort_keygen(alg=ns.LOCALE) class State: def __init__(self): self.lock = threading.RLock() self._space = Space(0, 0, 0, 0) self.root: list[FileEntry] = [] @property def space(self): with self.lock: return self._space @space.setter def space(self, space): with self.lock: self._space = space def treeiter(rootmod): relpath = PurePosixPath() for i, entry in enumerate(rootmod): if entry.level > 0: relpath = PurePosixPath(*relpath.parts[: entry.level - 1]) / entry.name yield i, relpath, entry def treeget(rootmod: list[FileEntry], path: PurePosixPath): logger.debug(f"DEBUG: treeget ENTRY: path={path}, rootmod_len={len(rootmod)}") begin = None ret = [] iteration_count = 0 for i, relpath, entry in treeiter(rootmod): iteration_count += 1 if ( iteration_count % 1000 == 0 ): # Log every 1000 iterations to detect infinite loops logger.debug( f"DEBUG: treeget iteration {iteration_count}, i={i}, relpath={relpath}, entry.name={entry.name}" ) if begin is None: if relpath == path: logger.debug(f"DEBUG: treeget FOUND path {path} at index {i}") begin = i ret.append(entry) continue if entry.level <= len(path.parts): logger.debug( f"DEBUG: treeget BREAK: entry.level={entry.level} <= path.parts_len={len(path.parts)}" ) break ret.append(entry) logger.debug( f"DEBUG: treeget EXIT: path={path}, begin={begin}, ret_len={len(ret)}, iterations={iteration_count}" ) return begin, ret def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int): # Find the first entry greater than the new one # precondition: the new entry doesn't exist logger.debug( f"DEBUG: treeinspos ENTRY: relpath={relpath}, relfile={relfile}, rootmod_len={len(rootmod)}" ) isfile = 0 level = 0 i = 0 iteration_count = 0 for i, rel, entry in treeiter(rootmod): iteration_count += 1 # Detect potential infinite loops in treeinspos if iteration_count % 1000 == 0: logger.debug( f"DEBUG: treeinspos iteration {iteration_count}, i={i}, rel={rel}, entry.name={entry.name}, level={level}, entry.level={entry.level}" ) if iteration_count > 10000: # Emergency brake for infinite loops logger.error( f"ERROR: treeinspos potential infinite loop! iteration={iteration_count}, relpath={relpath}, i={i}, level={level}" ) break if entry.level > level: # We haven't found item at level, skip subdirectories logger.debug( f"DEBUG: treeinspos SKIP: entry.level={entry.level} > level={level}" ) continue if entry.level < level: # We have passed the level, so the new item is the first logger.debug( f"DEBUG: treeinspos RETURN_EARLY: entry.level={entry.level} < level={level}, returning i={i}" ) return i if level == 0: # root logger.debug("DEBUG: treeinspos ROOT: incrementing level from 0 to 1") level += 1 continue ename = rel.parts[level - 1] name = relpath.parts[level - 1] logger.debug( f"DEBUG: treeinspos COMPARE: ename='{ename}', name='{name}', level={level}" ) esort = sortkey(ename) nsort = sortkey(name) # Non-leaf are always folders, only use relfile at leaf isfile = relfile if len(relpath.parts) == level else 0 logger.debug( f"DEBUG: treeinspos SORT: esort={esort}, nsort={nsort}, isfile={isfile}, entry.isfile={entry.isfile}" ) # First compare by isfile, then by sorting order and if that too matches then case sensitive cmp = ( entry.isfile - isfile or (esort > nsort) - (esort < nsort) or (ename > name) - (ename < name) ) logger.debug(f"DEBUG: treeinspos CMP: cmp={cmp}") if cmp > 0: logger.debug(f"DEBUG: treeinspos RETURN: cmp > 0, returning i={i}") return i if cmp < 0: logger.debug(f"DEBUG: treeinspos CONTINUE: cmp < 0") continue logger.debug(f"DEBUG: treeinspos INCREMENT_LEVEL: level {level} -> {level + 1}") level += 1 if level > len(relpath.parts): logger.error( f"ERROR: insertpos level overflow: relpath={relpath}, i={i}, entry.name={entry.name}, entry.level={entry.level}, level={level}" ) break else: logger.debug(f"DEBUG: treeinspos FOR_ELSE: incrementing i from {i} to {i + 1}") i += 1 logger.debug( f"DEBUG: treeinspos EXIT: returning i={i}, iterations={iteration_count}" ) return i state = State() rootpath: Path = None # type: ignore quit = threading.Event() # Keep a reference so the file stays open for faulthandler outputs _faulthandler_file = None # type: ignore ## Filesystem scanning def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry]: path = rootpath / rel ret = [] try: st = stat or path.stat() isfile = int(not S_ISDIR(st.st_mode)) entry = FileEntry( level=len(rel.parts), name=rel.name, key=fuid(st), mtime=int(st.st_mtime), size=st.st_size if isfile else 0, isfile=isfile, ) if isfile: return [entry] # Walk all entries of the directory ret: list[FileEntry] = [...] # type: ignore li = [] for f in path.iterdir(): if quit.is_set(): raise SystemExit("quit") if f.name.startswith("."): continue # No dotfiles with suppress(FileNotFoundError): s = f.lstat() isfile = S_ISREG(s.st_mode) isdir = S_ISDIR(s.st_mode) if not isfile and not isdir: continue li.append((int(isfile), f.name, s)) # Build the tree as a list of FileEntries for [_, name, s] in humansorted(li): sub = walk(rel / name, stat=s) child = sub[0] entry = FileEntry( level=entry.level, name=entry.name, key=entry.key, size=entry.size + child.size, mtime=max(entry.mtime, child.mtime), isfile=entry.isfile, ) ret.extend(sub) except FileNotFoundError: pass # Things may be rapidly in motion except OSError as e: if e.errno == 13: # Permission denied pass logger.error(f"Watching {path=}: {e!r}") if ret: ret[0] = entry return ret def update_root(loop): """Full filesystem scan""" old = state.root new = walk(PurePosixPath()) if old != new: update = format_update(old, new) with state.lock: broadcast(update, loop) state.root = new def update_path(rootmod: list[FileEntry], relpath: PurePosixPath, loop): """Called on FS updates, check the filesystem and broadcast any changes.""" logger.debug( f"DEBUG: update_path ENTRY: path={relpath}, rootmod_len={len(rootmod)}" ) # Add timing for walk operation walk_start = time.perf_counter() new = walk(relpath) walk_end = time.perf_counter() logger.debug( f"DEBUG: walk({relpath}) took {walk_end - walk_start:.4f}s, returned {len(new)} entries" ) # Add timing for treeget operation treeget_start = time.perf_counter() obegin, old = treeget(rootmod, relpath) treeget_end = time.perf_counter() logger.debug( f"DEBUG: treeget({relpath}) took {treeget_end - treeget_start:.4f}s, obegin={obegin}, old_len={len(old) if old else 0}" ) if old == new: logger.debug( f"Watch: Event without changes needed {relpath}" if old else f"Watch: Event with old and new missing: {relpath}" ) logger.debug(f"DEBUG: update_path EARLY_EXIT: no changes for {relpath}") return # Debug the deletion operation if obegin is not None: logger.debug( f"DEBUG: DELETING entries from rootmod[{obegin}:{obegin + len(old)}] for path {relpath}" ) del rootmod[obegin : obegin + len(old)] logger.debug(f"DEBUG: DELETED entries, rootmod_len now {len(rootmod)}") if new: logger.debug(f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}") # Add timing for treeinspos operation - this is where hangs might occur inspos_start = time.perf_counter() i = treeinspos(rootmod, relpath, new[0].isfile) inspos_end = time.perf_counter() logger.debug( f"DEBUG: treeinspos({relpath}) took {inspos_end - inspos_start:.4f}s, returned index={i}" ) logger.debug( f"DEBUG: INSERTING {len(new)} entries at position {i} for path {relpath}" ) rootmod[i:i] = new logger.debug(f"DEBUG: INSERTED entries, rootmod_len now {len(rootmod)}") else: logger.debug(f"Watch: Removed {relpath}") logger.debug( f"DEBUG: update_path EXIT: path={relpath}, final_rootmod_len={len(rootmod)}" ) def update_space(loop): """Called periodically to update the disk usage.""" du = shutil.disk_usage(rootpath) space = Space(*du, storage=state.root[0].size) # Update only on difference above 1 MB tol = 10**6 old = msgspec.structs.astuple(state.space) new = msgspec.structs.astuple(space) if any(abs(o - n) > tol for o, n in zip(old, new, strict=True)): state.space = space broadcast(format_space(space), loop) ## Messaging def format_update(old, new): logger.debug(f"DEBUG: format_update ENTRY: old_len={len(old)}, new_len={len(new)}") # Make keep/del/insert diff until one of the lists ends oidx, nidx = 0, 0 oremain, nremain = set(old), set(new) update = [] keep_count = 0 iteration_count = 0 while oidx < len(old) and nidx < len(new): iteration_count += 1 # Log every 1000 iterations to detect infinite loops if iteration_count % 1000 == 0: logger.debug( f"DEBUG: format_update iteration {iteration_count}, oidx={oidx}/{len(old)}, nidx={nidx}/{len(new)}" ) # Emergency brake for potential infinite loops if iteration_count > 50000: logger.error( f"ERROR: format_update potential infinite loop! iteration={iteration_count}, oidx={oidx}, nidx={nidx}" ) raise Exception( f"format_update infinite loop detected at iteration {iteration_count}" ) modified = False # Matching entries are kept if old[oidx] == new[nidx]: entry = old[oidx] logger.debug( f"DEBUG: format_update MATCH: entry={entry.name}, oidx={oidx}, nidx={nidx}" ) oremain.remove(entry) nremain.remove(entry) keep_count += 1 oidx += 1 nidx += 1 continue if keep_count > 0: logger.debug(f"DEBUG: format_update KEEP: adding UpdKeep({keep_count})") modified = True update.append(UpdKeep(keep_count)) keep_count = 0 # Items only in old are deleted del_count = 0 del_start_oidx = oidx while oidx < len(old) and old[oidx] not in nremain: logger.debug( f"DEBUG: format_update DELETE: removing old[{oidx}]={old[oidx].name}" ) oremain.remove(old[oidx]) del_count += 1 oidx += 1 if del_count: logger.debug( f"DEBUG: format_update DEL: adding UpdDel({del_count}), oidx {del_start_oidx}->{oidx}" ) update.append(UpdDel(del_count)) continue # Items only in new are inserted insert_items = [] ins_start_nidx = nidx while nidx < len(new) and new[nidx] not in oremain: entry = new[nidx] logger.debug( f"DEBUG: format_update INSERT: adding new[{nidx}]={entry.name}" ) nremain.remove(entry) insert_items.append(entry) nidx += 1 if insert_items: logger.debug( f"DEBUG: format_update INS: adding UpdIns({len(insert_items)} items), nidx {ins_start_nidx}->{nidx}" ) modified = True update.append(UpdIns(insert_items)) if not modified: logger.error( f"ERROR: format_update INFINITE_LOOP: nidx={nidx}, oidx={oidx}, old_len={len(old)}, new_len={len(new)}" ) logger.error( f"ERROR: old[oidx]={old[oidx].name if oidx < len(old) else 'OUT_OF_BOUNDS'}" ) logger.error( f"ERROR: new[nidx]={new[nidx].name if nidx < len(new) else 'OUT_OF_BOUNDS'}" ) raise Exception( f"Infinite loop in diff {nidx=} {oidx=} {len(old)=} {len(new)=}" ) # Diff any remaining if keep_count > 0: logger.debug(f"DEBUG: format_update FINAL_KEEP: adding UpdKeep({keep_count})") update.append(UpdKeep(keep_count)) if oremain: logger.debug( f"DEBUG: format_update FINAL_DEL: adding UpdDel({len(oremain)}) for remaining old items" ) update.append(UpdDel(len(oremain))) elif nremain: logger.debug( f"DEBUG: format_update FINAL_INS: adding UpdIns({len(new[nidx:])}) for remaining new items" ) update.append(UpdIns(new[nidx:])) logger.debug( f"DEBUG: format_update EXIT: generated {len(update)} operations, iterations={iteration_count}" ) return msgspec.json.encode({"update": update}).decode() def format_space(usage): return msgspec.json.encode({"space": usage}).decode() def format_root(root): return msgspec.json.encode({"root": root}).decode() def broadcast(msg, loop): return asyncio.run_coroutine_threadsafe(abroadcast(msg), loop).result() async def abroadcast(msg): try: for queue in pubsub.values(): queue.put_nowait(msg) except Exception: # Log because asyncio would silently eat the error logger.exception("Broadcast error") ## Watcher thread def watcher_inotify(loop): """Inotify watcher thread (Linux only)""" import inotify.adapters modified_flags = ( "IN_CREATE", "IN_DELETE", "IN_DELETE_SELF", "IN_MODIFY", "IN_MOVE_SELF", "IN_MOVED_FROM", "IN_MOVED_TO", ) while not quit.is_set(): i = inotify.adapters.InotifyTree(rootpath.as_posix()) # Initialize the tree from filesystem t0 = time.perf_counter() update_root(loop) t1 = time.perf_counter() logger.debug(f"Root update took {t1 - t0:.1f}s") trefresh = time.monotonic() + 300.0 tspace = time.monotonic() + 5.0 # Watch for changes (frequent wakeups needed for quiting) while not quit.is_set(): t = time.monotonic() # The watching is not entirely reliable, so do a full refresh every 30 seconds if t >= trefresh: break # Disk usage update if t >= tspace: tspace = time.monotonic() + 5.0 update_space(loop) # Inotify events, update the tree dirty = False rootmod = state.root[:] for event in i.event_gen(yield_nones=False, timeout_s=0.1): assert event if quit.is_set(): return interesting = any(f in modified_flags for f in event[1]) if event[2] == rootpath.as_posix() and event[3] == "zzz": logger.debug(f"Watch: {interesting=} {event=}") if interesting: # Update modified path logger.debug( f"DEBUG: inotify PROCESSING: event={event}, path={event[2]}/{event[3]}" ) t0 = time.perf_counter() path = PurePosixPath(event[2]) / event[3] try: rel_path = path.relative_to(rootpath) logger.debug( f"DEBUG: inotify CALLING update_path: rel_path={rel_path}" ) update_path(rootmod, rel_path, loop) logger.debug( f"DEBUG: inotify update_path COMPLETED: rel_path={rel_path}" ) except Exception as e: logger.error( f"ERROR: inotify update_path FAILED: path={path}, error={e}" ) raise t1 = time.perf_counter() logger.debug(f"Watch: Update {event[3]} took {t1 - t0:.1f}s") if not dirty: t = time.monotonic() dirty = True # Wait a maximum of 0.5s to push the updates if dirty and time.monotonic() >= t + 0.5: logger.debug("DEBUG: inotify TIMEOUT: breaking due to 0.5s timeout") break if dirty and state.root != rootmod: logger.debug( f"DEBUG: inotify BATCH_UPDATE: state.root_len={len(state.root)}, rootmod_len={len(rootmod)}" ) t0 = time.perf_counter() logger.debug("DEBUG: inotify CALLING format_update") update = format_update(state.root, rootmod) logger.debug("DEBUG: inotify format_update COMPLETED") t1 = time.perf_counter() with state.lock: logger.debug("DEBUG: inotify BROADCASTING update") broadcast(update, loop) state.root = rootmod logger.debug("DEBUG: inotify BROADCAST completed, state updated") t2 = time.perf_counter() logger.debug( f"Format update took {t1 - t0:.1f}s, broadcast {t2 - t1:.1f}s" ) del i # Free the inotify object def watcher_poll(loop): """Polling version of the watcher thread.""" while not quit.is_set(): t0 = time.perf_counter() update_root(loop) update_space(loop) dur = time.perf_counter() - t0 if dur > 1.0: logger.debug(f"Reading the full file list took {dur:.1f}s") quit.wait(0.1 + 8 * dur) async def start(app, loop): global rootpath config.load_config() rootpath = config.config.path # Optional: enable SIGUSR1 stack dumps in production for debugging hangs # Control with env CISTA_STACK_DUMP (default: enabled). Sends all thread # stacks to a per-process log in /tmp when receiving SIGUSR1. if os.environ.get("CISTA_STACK_DUMP", "1") == "1": try: import faulthandler global _faulthandler_file if _faulthandler_file is None: log_path = f"/tmp/cista-stacks-{os.getpid()}.log" # Line-buffered text file so writes appear promptly _faulthandler_file = open(log_path, "a", buffering=1) faulthandler.enable(file=_faulthandler_file) faulthandler.register( signal.SIGUSR1, file=_faulthandler_file, all_threads=True, chain=True ) logger.info( "Stack dump enabled: send SIGUSR1 to PID %s to write all thread stacks to %s", os.getpid(), log_path, ) except Exception: logger.exception("Failed to enable SIGUSR1 stack dump handler") use_inotify = sys.platform == "linux" app.ctx.watcher = threading.Thread( target=watcher_inotify if use_inotify else watcher_poll, args=[loop], # Descriptive name for system monitoring name=f"cista-watcher {rootpath}", ) app.ctx.watcher.start() async def stop(app, loop): quit.set() app.ctx.watcher.join()