Update the project to run with modern uv/bun (or python/nodejs) environment #7
@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import threading
|
||||
@ -8,7 +7,6 @@ from contextlib import suppress
|
||||
from os import stat_result
|
||||
from pathlib import Path, PurePosixPath
|
||||
from stat import S_ISDIR, S_ISREG
|
||||
import signal
|
||||
|
||||
import msgspec
|
||||
from natsort import humansorted, natsort_keygen, ns
|
||||
@ -48,82 +46,47 @@ def treeiter(rootmod):
|
||||
|
||||
|
||||
def treeget(rootmod: list[FileEntry], path: PurePosixPath):
|
||||
logger.debug(f"DEBUG: treeget ENTRY: path={path}, rootmod_len={len(rootmod)}")
|
||||
begin = None
|
||||
ret = []
|
||||
iteration_count = 0
|
||||
|
||||
for i, relpath, entry in treeiter(rootmod):
|
||||
iteration_count += 1
|
||||
if (
|
||||
iteration_count % 1000 == 0
|
||||
): # Log every 1000 iterations to detect infinite loops
|
||||
logger.debug(
|
||||
f"DEBUG: treeget iteration {iteration_count}, i={i}, relpath={relpath}, entry.name={entry.name}"
|
||||
)
|
||||
|
||||
if begin is None:
|
||||
if relpath == path:
|
||||
logger.debug(f"DEBUG: treeget FOUND path {path} at index {i}")
|
||||
begin = i
|
||||
ret.append(entry)
|
||||
continue
|
||||
if entry.level <= len(path.parts):
|
||||
logger.debug(
|
||||
f"DEBUG: treeget BREAK: entry.level={entry.level} <= path.parts_len={len(path.parts)}"
|
||||
)
|
||||
break
|
||||
ret.append(entry)
|
||||
|
||||
logger.debug(
|
||||
f"DEBUG: treeget EXIT: path={path}, begin={begin}, ret_len={len(ret)}, iterations={iteration_count}"
|
||||
)
|
||||
return begin, ret
|
||||
|
||||
|
||||
def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int):
|
||||
# Find the first entry greater than the new one
|
||||
# precondition: the new entry doesn't exist
|
||||
logger.debug(
|
||||
f"DEBUG: treeinspos ENTRY: relpath={relpath}, relfile={relfile}, rootmod_len={len(rootmod)}"
|
||||
)
|
||||
|
||||
isfile = 0
|
||||
level = 0
|
||||
i = 0
|
||||
for i, rel, entry in treeiter(rootmod):
|
||||
|
||||
if entry.level > level:
|
||||
# We haven't found item at level, skip subdirectories
|
||||
logger.debug(
|
||||
f"DEBUG: treeinspos SKIP: entry.level={entry.level} > level={level}"
|
||||
)
|
||||
continue
|
||||
if entry.level < level:
|
||||
# We have passed the level, so the new item is the first
|
||||
logger.debug(
|
||||
f"DEBUG: treeinspos RETURN_EARLY: entry.level={entry.level} < level={level}, returning i={i}"
|
||||
)
|
||||
return i
|
||||
if level == 0:
|
||||
# root
|
||||
logger.debug("DEBUG: treeinspos ROOT: incrementing level from 0 to 1")
|
||||
level += 1
|
||||
continue
|
||||
|
||||
ename = rel.parts[level - 1]
|
||||
name = relpath.parts[level - 1]
|
||||
logger.debug(
|
||||
f"DEBUG: treeinspos COMPARE: ename='{ename}', name='{name}', level={level}"
|
||||
)
|
||||
|
||||
esort = sortkey(ename)
|
||||
nsort = sortkey(name)
|
||||
# Non-leaf are always folders, only use relfile at leaf
|
||||
isfile = relfile if len(relpath.parts) == level else 0
|
||||
logger.debug(
|
||||
f"DEBUG: treeinspos SORT: esort={esort}, nsort={nsort}, isfile={isfile}, entry.isfile={entry.isfile}"
|
||||
)
|
||||
|
||||
# First compare by isfile, then by sorting order and if that too matches then case sensitive
|
||||
cmp = (
|
||||
@ -131,27 +94,21 @@ def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int):
|
||||
or (esort > nsort) - (esort < nsort)
|
||||
or (ename > name) - (ename < name)
|
||||
)
|
||||
logger.debug(f"DEBUG: treeinspos CMP: cmp={cmp}")
|
||||
|
||||
if cmp > 0:
|
||||
logger.debug(f"DEBUG: treeinspos RETURN: cmp > 0, returning i={i}")
|
||||
return i
|
||||
if cmp < 0:
|
||||
logger.debug("DEBUG: treeinspos CONTINUE: cmp < 0")
|
||||
continue
|
||||
|
||||
logger.debug(f"DEBUG: treeinspos INCREMENT_LEVEL: level {level} -> {level + 1}")
|
||||
level += 1
|
||||
if level > len(relpath.parts):
|
||||
logger.error(
|
||||
f"ERROR: insertpos level overflow: relpath={relpath}, i={i}, entry.name={entry.name}, entry.level={entry.level}, level={level}"
|
||||
f"insertpos level overflow: relpath={relpath}, i={i}, entry.name={entry.name}, entry.level={entry.level}, level={level}"
|
||||
)
|
||||
break
|
||||
else:
|
||||
logger.debug(f"DEBUG: treeinspos FOR_ELSE: incrementing i from {i} to {i + 1}")
|
||||
i += 1
|
||||
|
||||
logger.debug(f"DEBUG: treeinspos EXIT: returning i={i}")
|
||||
return i
|
||||
|
||||
|
||||
@ -159,9 +116,6 @@ state = State()
|
||||
rootpath: Path = None # type: ignore
|
||||
quit = threading.Event()
|
||||
|
||||
# Keep a reference so the file stays open for faulthandler outputs
|
||||
_faulthandler_file = None # type: ignore
|
||||
|
||||
## Filesystem scanning
|
||||
|
||||
|
||||
@ -233,65 +187,18 @@ def update_root(loop):
|
||||
|
||||
def update_path(rootmod: list[FileEntry], relpath: PurePosixPath, loop):
|
||||
"""Called on FS updates, check the filesystem and broadcast any changes."""
|
||||
logger.debug(
|
||||
f"DEBUG: update_path ENTRY: path={relpath}, rootmod_len={len(rootmod)}"
|
||||
)
|
||||
|
||||
# Add timing for walk operation
|
||||
walk_start = time.perf_counter()
|
||||
new = walk(relpath)
|
||||
walk_end = time.perf_counter()
|
||||
logger.debug(
|
||||
f"DEBUG: walk({relpath}) took {walk_end - walk_start:.4f}s, returned {len(new)} entries"
|
||||
)
|
||||
|
||||
# Add timing for treeget operation
|
||||
treeget_start = time.perf_counter()
|
||||
obegin, old = treeget(rootmod, relpath)
|
||||
treeget_end = time.perf_counter()
|
||||
logger.debug(
|
||||
f"DEBUG: treeget({relpath}) took {treeget_end - treeget_start:.4f}s, obegin={obegin}, old_len={len(old) if old else 0}"
|
||||
)
|
||||
|
||||
if old == new:
|
||||
logger.debug(
|
||||
f"Watch: Event without changes needed {relpath}"
|
||||
if old
|
||||
else f"Watch: Event with old and new missing: {relpath}"
|
||||
)
|
||||
logger.debug(f"DEBUG: update_path EARLY_EXIT: no changes for {relpath}")
|
||||
return
|
||||
|
||||
# Debug the deletion operation
|
||||
if obegin is not None:
|
||||
logger.debug(
|
||||
f"DEBUG: DELETING entries from rootmod[{obegin}:{obegin + len(old)}] for path {relpath}"
|
||||
)
|
||||
del rootmod[obegin : obegin + len(old)]
|
||||
logger.debug(f"DEBUG: DELETED entries, rootmod_len now {len(rootmod)}")
|
||||
|
||||
if new:
|
||||
logger.debug(f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}")
|
||||
|
||||
# Add timing for treeinspos operation - this is where hangs might occur
|
||||
inspos_start = time.perf_counter()
|
||||
i = treeinspos(rootmod, relpath, new[0].isfile)
|
||||
inspos_end = time.perf_counter()
|
||||
logger.debug(
|
||||
f"DEBUG: treeinspos({relpath}) took {inspos_end - inspos_start:.4f}s, returned index={i}"
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"DEBUG: INSERTING {len(new)} entries at position {i} for path {relpath}"
|
||||
)
|
||||
rootmod[i:i] = new
|
||||
logger.debug(f"DEBUG: INSERTED entries, rootmod_len now {len(rootmod)}")
|
||||
else:
|
||||
logger.debug(f"Watch: Removed {relpath}")
|
||||
|
||||
logger.debug(
|
||||
f"DEBUG: update_path EXIT: path={relpath}, final_rootmod_len={len(rootmod)}"
|
||||
)
|
||||
|
||||
|
||||
def update_space(loop):
|
||||
@ -311,8 +218,6 @@ def update_space(loop):
|
||||
|
||||
|
||||
def format_update(old, new):
|
||||
logger.debug(f"DEBUG: format_update ENTRY: old_len={len(old)}, new_len={len(new)}")
|
||||
|
||||
# Make keep/del/insert diff until one of the lists ends
|
||||
oidx, nidx = 0, 0
|
||||
oremain, nremain = set(old), set(new)
|
||||
@ -327,16 +232,10 @@ def format_update(old, new):
|
||||
while oidx < len(old) and nidx < len(new):
|
||||
iteration_count += 1
|
||||
|
||||
# Log every 1000 iterations to detect infinite loops
|
||||
if iteration_count % 1000 == 0:
|
||||
logger.debug(
|
||||
f"DEBUG: format_update iteration {iteration_count}, oidx={oidx}/{len(old)}, nidx={nidx}/{len(new)}"
|
||||
)
|
||||
|
||||
# Emergency brake for potential infinite loops
|
||||
if iteration_count > 50000:
|
||||
logger.error(
|
||||
f"ERROR: format_update potential infinite loop! iteration={iteration_count}, oidx={oidx}, nidx={nidx}"
|
||||
f"format_update potential infinite loop! iteration={iteration_count}, oidx={oidx}, nidx={nidx}"
|
||||
)
|
||||
raise Exception(
|
||||
f"format_update infinite loop detected at iteration {iteration_count}"
|
||||
@ -346,54 +245,36 @@ def format_update(old, new):
|
||||
# Matching entries are kept
|
||||
if old[oidx] == new[nidx]:
|
||||
entry = old[oidx]
|
||||
logger.debug(
|
||||
f"DEBUG: format_update MATCH: entry={entry.name}, oidx={oidx}, nidx={nidx}"
|
||||
)
|
||||
oremain.remove(entry)
|
||||
nremain.remove(entry)
|
||||
oremain.discard(entry)
|
||||
nremain.discard(entry)
|
||||
keep_count += 1
|
||||
oidx += 1
|
||||
nidx += 1
|
||||
continue
|
||||
|
||||
if keep_count > 0:
|
||||
logger.debug(f"DEBUG: format_update KEEP: adding UpdKeep({keep_count})")
|
||||
modified = True
|
||||
update.append(UpdKeep(keep_count))
|
||||
keep_count = 0
|
||||
|
||||
# Items only in old are deleted
|
||||
del_count = 0
|
||||
del_start_oidx = oidx
|
||||
while oidx < len(old) and old[oidx] not in nremain:
|
||||
logger.debug(
|
||||
f"DEBUG: format_update DELETE: removing old[{oidx}]={old[oidx].name}"
|
||||
)
|
||||
oremain.remove(old[oidx])
|
||||
del_count += 1
|
||||
oidx += 1
|
||||
if del_count:
|
||||
logger.debug(
|
||||
f"DEBUG: format_update DEL: adding UpdDel({del_count}), oidx {del_start_oidx}->{oidx}"
|
||||
)
|
||||
update.append(UpdDel(del_count))
|
||||
continue
|
||||
|
||||
# Items only in new are inserted
|
||||
insert_items = []
|
||||
ins_start_nidx = nidx
|
||||
while nidx < len(new) and new[nidx] not in oremain:
|
||||
entry = new[nidx]
|
||||
logger.debug(
|
||||
f"DEBUG: format_update INSERT: adding new[{nidx}]={entry.name}"
|
||||
)
|
||||
nremain.remove(entry)
|
||||
insert_items.append(entry)
|
||||
nidx += 1
|
||||
if insert_items:
|
||||
logger.debug(
|
||||
f"DEBUG: format_update INS: adding UpdIns({len(insert_items)} items), nidx {ins_start_nidx}->{nidx}"
|
||||
)
|
||||
modified = True
|
||||
update.append(UpdIns(insert_items))
|
||||
|
||||
@ -419,36 +300,20 @@ def format_update(old, new):
|
||||
oremain.discard(cur_old)
|
||||
update.append(UpdDel(1))
|
||||
oidx += 1
|
||||
logger.debug(
|
||||
f"DEBUG: format_update TIEBREAK_DEL: oidx->{oidx}, cur_old={cur_old.name}"
|
||||
)
|
||||
else:
|
||||
# Insert current new item
|
||||
nremain.discard(cur_new)
|
||||
update.append(UpdIns([cur_new]))
|
||||
nidx += 1
|
||||
logger.debug(
|
||||
f"DEBUG: format_update TIEBREAK_INS: nidx->{nidx}, cur_new={cur_new.name}"
|
||||
)
|
||||
|
||||
# Diff any remaining
|
||||
if keep_count > 0:
|
||||
logger.debug(f"DEBUG: format_update FINAL_KEEP: adding UpdKeep({keep_count})")
|
||||
update.append(UpdKeep(keep_count))
|
||||
if oremain:
|
||||
logger.debug(
|
||||
f"DEBUG: format_update FINAL_DEL: adding UpdDel({len(oremain)}) for remaining old items"
|
||||
)
|
||||
update.append(UpdDel(len(oremain)))
|
||||
elif nremain:
|
||||
logger.debug(
|
||||
f"DEBUG: format_update FINAL_INS: adding UpdIns({len(new[nidx:])}) for remaining new items"
|
||||
)
|
||||
update.append(UpdIns(new[nidx:]))
|
||||
|
||||
logger.debug(
|
||||
f"DEBUG: format_update EXIT: generated {len(update)} operations, iterations={iteration_count}"
|
||||
)
|
||||
return msgspec.json.encode({"update": update}).decode()
|
||||
|
||||
|
||||
@ -492,10 +357,7 @@ def watcher_inotify(loop):
|
||||
while not quit.is_set():
|
||||
i = inotify.adapters.InotifyTree(rootpath.as_posix())
|
||||
# Initialize the tree from filesystem
|
||||
t0 = time.perf_counter()
|
||||
update_root(loop)
|
||||
t1 = time.perf_counter()
|
||||
logger.debug(f"Root update took {t1 - t0:.1f}s")
|
||||
trefresh = time.monotonic() + 300.0
|
||||
tspace = time.monotonic() + 5.0
|
||||
# Watch for changes (frequent wakeups needed for quiting)
|
||||
@ -516,57 +378,29 @@ def watcher_inotify(loop):
|
||||
if quit.is_set():
|
||||
return
|
||||
interesting = any(f in modified_flags for f in event[1])
|
||||
if event[2] == rootpath.as_posix() and event[3] == "zzz":
|
||||
logger.debug(f"Watch: {interesting=} {event=}")
|
||||
if interesting:
|
||||
# Update modified path
|
||||
logger.debug(
|
||||
f"DEBUG: inotify PROCESSING: event={event}, path={event[2]}/{event[3]}"
|
||||
)
|
||||
t0 = time.perf_counter()
|
||||
path = PurePosixPath(event[2]) / event[3]
|
||||
try:
|
||||
rel_path = path.relative_to(rootpath)
|
||||
logger.debug(
|
||||
f"DEBUG: inotify CALLING update_path: rel_path={rel_path}"
|
||||
)
|
||||
update_path(rootmod, rel_path, loop)
|
||||
logger.debug(
|
||||
f"DEBUG: inotify update_path COMPLETED: rel_path={rel_path}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"ERROR: inotify update_path FAILED: path={path}, error={e}"
|
||||
f"Error processing inotify event for path {path}: {e}"
|
||||
)
|
||||
raise
|
||||
t1 = time.perf_counter()
|
||||
logger.debug(f"Watch: Update {event[3]} took {t1 - t0:.1f}s")
|
||||
if not dirty:
|
||||
t = time.monotonic()
|
||||
dirty = True
|
||||
# Wait a maximum of 0.5s to push the updates
|
||||
if dirty and time.monotonic() >= t + 0.5:
|
||||
logger.debug("DEBUG: inotify TIMEOUT: breaking due to 0.5s timeout")
|
||||
break
|
||||
if dirty and state.root != rootmod:
|
||||
logger.debug(
|
||||
f"DEBUG: inotify BATCH_UPDATE: state.root_len={len(state.root)}, rootmod_len={len(rootmod)}"
|
||||
)
|
||||
t0 = time.perf_counter()
|
||||
logger.debug("DEBUG: inotify CALLING format_update")
|
||||
try:
|
||||
update = format_update(state.root, rootmod)
|
||||
logger.debug("DEBUG: inotify format_update COMPLETED")
|
||||
t1 = time.perf_counter()
|
||||
with state.lock:
|
||||
logger.debug("DEBUG: inotify BROADCASTING update")
|
||||
broadcast(update, loop)
|
||||
state.root = rootmod
|
||||
logger.debug("DEBUG: inotify BROADCAST completed, state updated")
|
||||
t2 = time.perf_counter()
|
||||
logger.debug(
|
||||
f"Format update took {t1 - t0:.1f}s, broadcast {t2 - t1:.1f}s"
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"format_update failed; falling back to full rescan"
|
||||
@ -579,7 +413,6 @@ def watcher_inotify(loop):
|
||||
with state.lock:
|
||||
broadcast(update, loop)
|
||||
state.root = fresh
|
||||
logger.debug("Fallback diff succeeded after full rescan")
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Fallback diff failed; sending full root snapshot"
|
||||
@ -611,29 +444,6 @@ async def start(app, loop):
|
||||
global rootpath
|
||||
config.load_config()
|
||||
rootpath = config.config.path
|
||||
# Optional: enable SIGUSR1 stack dumps in production for debugging hangs
|
||||
# Control with env CISTA_STACK_DUMP (default: enabled). Sends all thread
|
||||
# stacks to a per-process log in /tmp when receiving SIGUSR1.
|
||||
if os.environ.get("CISTA_STACK_DUMP", "1") == "1":
|
||||
try:
|
||||
import faulthandler
|
||||
|
||||
global _faulthandler_file
|
||||
if _faulthandler_file is None:
|
||||
log_path = f"/tmp/cista-stacks-{os.getpid()}.log"
|
||||
# Line-buffered text file so writes appear promptly
|
||||
_faulthandler_file = open(log_path, "a", buffering=1)
|
||||
faulthandler.enable(file=_faulthandler_file)
|
||||
faulthandler.register(
|
||||
signal.SIGUSR1, file=_faulthandler_file, all_threads=True, chain=True
|
||||
)
|
||||
logger.info(
|
||||
"Stack dump enabled: send SIGUSR1 to PID %s to write all thread stacks to %s",
|
||||
os.getpid(),
|
||||
log_path,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to enable SIGUSR1 stack dump handler")
|
||||
use_inotify = sys.platform == "linux"
|
||||
app.ctx.watcher = threading.Thread(
|
||||
target=watcher_inotify if use_inotify else watcher_poll,
|
||||
|
Loading…
x
Reference in New Issue
Block a user