diff --git a/cista/watching.py b/cista/watching.py index 0825f90..b36e165 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -1,5 +1,4 @@ import asyncio -import os import shutil import sys import threading @@ -8,7 +7,6 @@ from contextlib import suppress from os import stat_result from pathlib import Path, PurePosixPath from stat import S_ISDIR, S_ISREG -import signal import msgspec from natsort import humansorted, natsort_keygen, ns @@ -48,82 +46,47 @@ def treeiter(rootmod): def treeget(rootmod: list[FileEntry], path: PurePosixPath): - logger.debug(f"DEBUG: treeget ENTRY: path={path}, rootmod_len={len(rootmod)}") begin = None ret = [] - iteration_count = 0 for i, relpath, entry in treeiter(rootmod): - iteration_count += 1 - if ( - iteration_count % 1000 == 0 - ): # Log every 1000 iterations to detect infinite loops - logger.debug( - f"DEBUG: treeget iteration {iteration_count}, i={i}, relpath={relpath}, entry.name={entry.name}" - ) - if begin is None: if relpath == path: - logger.debug(f"DEBUG: treeget FOUND path {path} at index {i}") begin = i ret.append(entry) continue if entry.level <= len(path.parts): - logger.debug( - f"DEBUG: treeget BREAK: entry.level={entry.level} <= path.parts_len={len(path.parts)}" - ) break ret.append(entry) - logger.debug( - f"DEBUG: treeget EXIT: path={path}, begin={begin}, ret_len={len(ret)}, iterations={iteration_count}" - ) return begin, ret def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int): # Find the first entry greater than the new one # precondition: the new entry doesn't exist - logger.debug( - f"DEBUG: treeinspos ENTRY: relpath={relpath}, relfile={relfile}, rootmod_len={len(rootmod)}" - ) - isfile = 0 level = 0 i = 0 for i, rel, entry in treeiter(rootmod): - if entry.level > level: # We haven't found item at level, skip subdirectories - logger.debug( - f"DEBUG: treeinspos SKIP: entry.level={entry.level} > level={level}" - ) continue if entry.level < level: # We have passed the level, so the new item is the first - logger.debug( - f"DEBUG: treeinspos RETURN_EARLY: entry.level={entry.level} < level={level}, returning i={i}" - ) return i if level == 0: # root - logger.debug("DEBUG: treeinspos ROOT: incrementing level from 0 to 1") level += 1 continue ename = rel.parts[level - 1] name = relpath.parts[level - 1] - logger.debug( - f"DEBUG: treeinspos COMPARE: ename='{ename}', name='{name}', level={level}" - ) esort = sortkey(ename) nsort = sortkey(name) # Non-leaf are always folders, only use relfile at leaf isfile = relfile if len(relpath.parts) == level else 0 - logger.debug( - f"DEBUG: treeinspos SORT: esort={esort}, nsort={nsort}, isfile={isfile}, entry.isfile={entry.isfile}" - ) # First compare by isfile, then by sorting order and if that too matches then case sensitive cmp = ( @@ -131,27 +94,21 @@ def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int): or (esort > nsort) - (esort < nsort) or (ename > name) - (ename < name) ) - logger.debug(f"DEBUG: treeinspos CMP: cmp={cmp}") if cmp > 0: - logger.debug(f"DEBUG: treeinspos RETURN: cmp > 0, returning i={i}") return i if cmp < 0: - logger.debug("DEBUG: treeinspos CONTINUE: cmp < 0") continue - logger.debug(f"DEBUG: treeinspos INCREMENT_LEVEL: level {level} -> {level + 1}") level += 1 if level > len(relpath.parts): logger.error( - f"ERROR: insertpos level overflow: relpath={relpath}, i={i}, entry.name={entry.name}, entry.level={entry.level}, level={level}" + f"insertpos level overflow: relpath={relpath}, i={i}, entry.name={entry.name}, entry.level={entry.level}, level={level}" ) break else: - logger.debug(f"DEBUG: treeinspos FOR_ELSE: incrementing i from {i} to {i + 1}") i += 1 - logger.debug(f"DEBUG: treeinspos EXIT: returning i={i}") return i @@ -159,9 +116,6 @@ state = State() rootpath: Path = None # type: ignore quit = threading.Event() -# Keep a reference so the file stays open for faulthandler outputs -_faulthandler_file = None # type: ignore - ## Filesystem scanning @@ -233,65 +187,18 @@ def update_root(loop): def update_path(rootmod: list[FileEntry], relpath: PurePosixPath, loop): """Called on FS updates, check the filesystem and broadcast any changes.""" - logger.debug( - f"DEBUG: update_path ENTRY: path={relpath}, rootmod_len={len(rootmod)}" - ) - - # Add timing for walk operation - walk_start = time.perf_counter() new = walk(relpath) - walk_end = time.perf_counter() - logger.debug( - f"DEBUG: walk({relpath}) took {walk_end - walk_start:.4f}s, returned {len(new)} entries" - ) - - # Add timing for treeget operation - treeget_start = time.perf_counter() obegin, old = treeget(rootmod, relpath) - treeget_end = time.perf_counter() - logger.debug( - f"DEBUG: treeget({relpath}) took {treeget_end - treeget_start:.4f}s, obegin={obegin}, old_len={len(old) if old else 0}" - ) if old == new: - logger.debug( - f"Watch: Event without changes needed {relpath}" - if old - else f"Watch: Event with old and new missing: {relpath}" - ) - logger.debug(f"DEBUG: update_path EARLY_EXIT: no changes for {relpath}") return - # Debug the deletion operation if obegin is not None: - logger.debug( - f"DEBUG: DELETING entries from rootmod[{obegin}:{obegin + len(old)}] for path {relpath}" - ) del rootmod[obegin : obegin + len(old)] - logger.debug(f"DEBUG: DELETED entries, rootmod_len now {len(rootmod)}") if new: - logger.debug(f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}") - - # Add timing for treeinspos operation - this is where hangs might occur - inspos_start = time.perf_counter() i = treeinspos(rootmod, relpath, new[0].isfile) - inspos_end = time.perf_counter() - logger.debug( - f"DEBUG: treeinspos({relpath}) took {inspos_end - inspos_start:.4f}s, returned index={i}" - ) - - logger.debug( - f"DEBUG: INSERTING {len(new)} entries at position {i} for path {relpath}" - ) rootmod[i:i] = new - logger.debug(f"DEBUG: INSERTED entries, rootmod_len now {len(rootmod)}") - else: - logger.debug(f"Watch: Removed {relpath}") - - logger.debug( - f"DEBUG: update_path EXIT: path={relpath}, final_rootmod_len={len(rootmod)}" - ) def update_space(loop): @@ -311,8 +218,6 @@ def update_space(loop): def format_update(old, new): - logger.debug(f"DEBUG: format_update ENTRY: old_len={len(old)}, new_len={len(new)}") - # Make keep/del/insert diff until one of the lists ends oidx, nidx = 0, 0 oremain, nremain = set(old), set(new) @@ -327,16 +232,10 @@ def format_update(old, new): while oidx < len(old) and nidx < len(new): iteration_count += 1 - # Log every 1000 iterations to detect infinite loops - if iteration_count % 1000 == 0: - logger.debug( - f"DEBUG: format_update iteration {iteration_count}, oidx={oidx}/{len(old)}, nidx={nidx}/{len(new)}" - ) - # Emergency brake for potential infinite loops if iteration_count > 50000: logger.error( - f"ERROR: format_update potential infinite loop! iteration={iteration_count}, oidx={oidx}, nidx={nidx}" + f"format_update potential infinite loop! iteration={iteration_count}, oidx={oidx}, nidx={nidx}" ) raise Exception( f"format_update infinite loop detected at iteration {iteration_count}" @@ -346,54 +245,36 @@ def format_update(old, new): # Matching entries are kept if old[oidx] == new[nidx]: entry = old[oidx] - logger.debug( - f"DEBUG: format_update MATCH: entry={entry.name}, oidx={oidx}, nidx={nidx}" - ) - oremain.remove(entry) - nremain.remove(entry) + oremain.discard(entry) + nremain.discard(entry) keep_count += 1 oidx += 1 nidx += 1 continue if keep_count > 0: - logger.debug(f"DEBUG: format_update KEEP: adding UpdKeep({keep_count})") modified = True update.append(UpdKeep(keep_count)) keep_count = 0 # Items only in old are deleted del_count = 0 - del_start_oidx = oidx while oidx < len(old) and old[oidx] not in nremain: - logger.debug( - f"DEBUG: format_update DELETE: removing old[{oidx}]={old[oidx].name}" - ) oremain.remove(old[oidx]) del_count += 1 oidx += 1 if del_count: - logger.debug( - f"DEBUG: format_update DEL: adding UpdDel({del_count}), oidx {del_start_oidx}->{oidx}" - ) update.append(UpdDel(del_count)) continue # Items only in new are inserted insert_items = [] - ins_start_nidx = nidx while nidx < len(new) and new[nidx] not in oremain: entry = new[nidx] - logger.debug( - f"DEBUG: format_update INSERT: adding new[{nidx}]={entry.name}" - ) nremain.remove(entry) insert_items.append(entry) nidx += 1 if insert_items: - logger.debug( - f"DEBUG: format_update INS: adding UpdIns({len(insert_items)} items), nidx {ins_start_nidx}->{nidx}" - ) modified = True update.append(UpdIns(insert_items)) @@ -419,36 +300,20 @@ def format_update(old, new): oremain.discard(cur_old) update.append(UpdDel(1)) oidx += 1 - logger.debug( - f"DEBUG: format_update TIEBREAK_DEL: oidx->{oidx}, cur_old={cur_old.name}" - ) else: # Insert current new item nremain.discard(cur_new) update.append(UpdIns([cur_new])) nidx += 1 - logger.debug( - f"DEBUG: format_update TIEBREAK_INS: nidx->{nidx}, cur_new={cur_new.name}" - ) # Diff any remaining if keep_count > 0: - logger.debug(f"DEBUG: format_update FINAL_KEEP: adding UpdKeep({keep_count})") update.append(UpdKeep(keep_count)) if oremain: - logger.debug( - f"DEBUG: format_update FINAL_DEL: adding UpdDel({len(oremain)}) for remaining old items" - ) update.append(UpdDel(len(oremain))) elif nremain: - logger.debug( - f"DEBUG: format_update FINAL_INS: adding UpdIns({len(new[nidx:])}) for remaining new items" - ) update.append(UpdIns(new[nidx:])) - logger.debug( - f"DEBUG: format_update EXIT: generated {len(update)} operations, iterations={iteration_count}" - ) return msgspec.json.encode({"update": update}).decode() @@ -492,10 +357,7 @@ def watcher_inotify(loop): while not quit.is_set(): i = inotify.adapters.InotifyTree(rootpath.as_posix()) # Initialize the tree from filesystem - t0 = time.perf_counter() update_root(loop) - t1 = time.perf_counter() - logger.debug(f"Root update took {t1 - t0:.1f}s") trefresh = time.monotonic() + 300.0 tspace = time.monotonic() + 5.0 # Watch for changes (frequent wakeups needed for quiting) @@ -516,57 +378,29 @@ def watcher_inotify(loop): if quit.is_set(): return interesting = any(f in modified_flags for f in event[1]) - if event[2] == rootpath.as_posix() and event[3] == "zzz": - logger.debug(f"Watch: {interesting=} {event=}") if interesting: # Update modified path - logger.debug( - f"DEBUG: inotify PROCESSING: event={event}, path={event[2]}/{event[3]}" - ) - t0 = time.perf_counter() path = PurePosixPath(event[2]) / event[3] try: rel_path = path.relative_to(rootpath) - logger.debug( - f"DEBUG: inotify CALLING update_path: rel_path={rel_path}" - ) update_path(rootmod, rel_path, loop) - logger.debug( - f"DEBUG: inotify update_path COMPLETED: rel_path={rel_path}" - ) except Exception as e: logger.error( - f"ERROR: inotify update_path FAILED: path={path}, error={e}" + f"Error processing inotify event for path {path}: {e}" ) raise - t1 = time.perf_counter() - logger.debug(f"Watch: Update {event[3]} took {t1 - t0:.1f}s") if not dirty: t = time.monotonic() dirty = True # Wait a maximum of 0.5s to push the updates if dirty and time.monotonic() >= t + 0.5: - logger.debug("DEBUG: inotify TIMEOUT: breaking due to 0.5s timeout") break if dirty and state.root != rootmod: - logger.debug( - f"DEBUG: inotify BATCH_UPDATE: state.root_len={len(state.root)}, rootmod_len={len(rootmod)}" - ) - t0 = time.perf_counter() - logger.debug("DEBUG: inotify CALLING format_update") try: update = format_update(state.root, rootmod) - logger.debug("DEBUG: inotify format_update COMPLETED") - t1 = time.perf_counter() with state.lock: - logger.debug("DEBUG: inotify BROADCASTING update") broadcast(update, loop) state.root = rootmod - logger.debug("DEBUG: inotify BROADCAST completed, state updated") - t2 = time.perf_counter() - logger.debug( - f"Format update took {t1 - t0:.1f}s, broadcast {t2 - t1:.1f}s" - ) except Exception: logger.exception( "format_update failed; falling back to full rescan" @@ -579,7 +413,6 @@ def watcher_inotify(loop): with state.lock: broadcast(update, loop) state.root = fresh - logger.debug("Fallback diff succeeded after full rescan") except Exception: logger.exception( "Fallback diff failed; sending full root snapshot" @@ -611,29 +444,6 @@ async def start(app, loop): global rootpath config.load_config() rootpath = config.config.path - # Optional: enable SIGUSR1 stack dumps in production for debugging hangs - # Control with env CISTA_STACK_DUMP (default: enabled). Sends all thread - # stacks to a per-process log in /tmp when receiving SIGUSR1. - if os.environ.get("CISTA_STACK_DUMP", "1") == "1": - try: - import faulthandler - - global _faulthandler_file - if _faulthandler_file is None: - log_path = f"/tmp/cista-stacks-{os.getpid()}.log" - # Line-buffered text file so writes appear promptly - _faulthandler_file = open(log_path, "a", buffering=1) - faulthandler.enable(file=_faulthandler_file) - faulthandler.register( - signal.SIGUSR1, file=_faulthandler_file, all_threads=True, chain=True - ) - logger.info( - "Stack dump enabled: send SIGUSR1 to PID %s to write all thread stacks to %s", - os.getpid(), - log_path, - ) - except Exception: - logger.exception("Failed to enable SIGUSR1 stack dump handler") use_inotify = sys.platform == "linux" app.ctx.watcher = threading.Thread( target=watcher_inotify if use_inotify else watcher_poll,