From dbb06e111cf98929553a6465de132c4acb275b35 Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Mon, 20 Nov 2023 18:49:24 +0000 Subject: [PATCH] Rewritten inotify watching and faster full tree traversal (also for polling worker). Bugs in inotify watcher fixed. --- cista/protocol.py | 2 +- cista/watching.py | 261 +++++++++++++++++++++++------------------ tests/test_watching.py | 64 ++-------- 3 files changed, 157 insertions(+), 170 deletions(-) diff --git a/cista/protocol.py b/cista/protocol.py index 6714f39..da76018 100644 --- a/cista/protocol.py +++ b/cista/protocol.py @@ -112,7 +112,7 @@ class ErrorMsg(msgspec.Struct): ## Directory listings -class FileEntry(msgspec.Struct, array_like=True): +class FileEntry(msgspec.Struct, array_like=True, frozen=True): level: int name: str key: str diff --git a/cista/watching.py b/cista/watching.py index a74d0c8..47d2c22 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -46,87 +46,70 @@ class State: with self.lock: self._listing = listing - def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]): - relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0) - begin, end = 0, len(self._listing) - level = 0 - isfile = 0 - # Special case for root - if not relpath.parts: - return slice(begin, end) +def treeiter(rootmod): + relpath = PurePosixPath() + for i, entry in enumerate(rootmod): + if entry.level > 0: + relpath = PurePosixPath(*relpath.parts[: entry.level - 1]) / entry.name + yield i, relpath, entry - for part in relpath.parts: + +def treeget(rootmod: list[FileEntry], path: PurePosixPath): + begin = None + ret = [] + for i, relpath, entry in treeiter(rootmod): + if begin is None: + if relpath == path: + begin = i + ret.append(entry) + continue + if entry.level <= len(path.parts): + break + ret.append(entry) + return begin, ret + + +def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int): + # Find the first entry greater than the new one + # precondition: the new entry doesn't exist + isfile = 0 + level = 0 + i = 0 + for i, rel, entry in treeiter(rootmod): + if entry.level > level: + # We haven't found item at level, skip subdirectories + continue + if entry.level < level: + # We have passed the level, so the new item is the first + return i + if level == 0: + # root level += 1 - found = False - - # Traverse contents of a folder at level - while begin + 1 < end: - begin += 1 - entry = self._listing[begin] - - # Skip subentries - if entry.level > level: - continue - - # The directory we were traversing has ended - if entry.level < level: - break - - # Found the requested part? - if entry.name == part: - found = True - # Are we finished yet? - if level == len(relpath.parts): - isfile = relfile - else: - # Enter subdirectory - continue - break - - # Checking if past the requested spot in sort order - cmp = entry.isfile - isfile or sortkey(entry.name) > sortkey(part) - if cmp > 0: - break - - if not found: - if level < len(relpath.parts) - 1: - # Fail if the parent folder is missing - parent = "/".join(relpath.parts[:level]) - raise ValueError( - f"Parent folder {parent} is missing for {relpath.as_posix()} at {level=}" - ) - # Insertion point at the end of the directory - return slice(begin, begin) - - # Otherwise found=True, continue to the next part if not there yet - - # Found the starting point, now find the end of the slice - for end in range(begin + 1, len(self._listing) + 1): - if end == len(self._listing) or self._listing[end].level <= level: - break - return slice(begin, end) - - def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]): - with self.lock: - return self._listing[self._slice(index)] - - def __setitem__( - self, index: tuple[PurePosixPath, int], value: list[FileEntry] - ) -> None: - rel, isfile = index - with self.lock: - if rel.parts: - parent = self._slice(rel.parent) - if parent.start == parent.stop: - raise ValueError( - f"Parent folder {rel.as_posix()} is missing for {rel.name} {level=}" - ) - self._listing[self._slice(index)] = value - - def __delitem__(self, relpath: PurePosixPath): - with self.lock: - del self._listing[self._slice(relpath)] + continue + ename = rel.parts[level - 1] + name = relpath.parts[level - 1] + esort = sortkey(ename) + nsort = sortkey(name) + # Non-leaf are always folders, only use relfile at leaf + isfile = relfile if len(relpath.parts) == level else 0 + # First compare by isfile, then by sorting order and if that too matches then case sensitive + cmp = ( + entry.isfile - isfile + or (esort > nsort) - (esort < nsort) + or (ename > name) - (ename < name) + ) + if cmp > 0: + return i + if cmp < 0: + continue + level += 1 + if level > len(relpath.parts): + print("ERROR: insertpos", relpath, i, entry.name, entry.level, level) + break + else: + i += 1 + return i state = State() @@ -141,7 +124,7 @@ def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry] ret = [] try: st = stat or path.stat() - isfile = not S_ISDIR(st.st_mode) + isfile = int(not S_ISDIR(st.st_mode)) entry = FileEntry( level=len(rel.parts), name=rel.name, @@ -170,10 +153,17 @@ def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry] # Build the tree as a list of FileEntries for [_, name, s] in humansorted(li): sub = walk(rel / name, stat=s) - ret.extend(sub) child = sub[0] - entry.mtime = max(entry.mtime, child.mtime) - entry.size += child.size + entry = FileEntry( + entry.level, + entry.name, + entry.key, + entry.size + child.size, + max(entry.mtime, child.mtime), + entry.isfile, + ) + ret[0] = entry + ret.extend(sub) except FileNotFoundError: pass # Things may be rapidly in motion except OSError as e: @@ -193,23 +183,25 @@ def update_root(loop): broadcast(format_update(old, new), loop) -def update_path(relpath: PurePosixPath, loop): +def update_path(rootmod: list[FileEntry], relpath: PurePosixPath, loop): """Called on FS updates, check the filesystem and broadcast any changes.""" new = walk(relpath) - with state.lock: - old = state[relpath] - if old == new: - return - old = state.root - if new: - logger.debug( - f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}" - ) - state[relpath, new[0].isfile] = new - else: - logger.debug(f"Watch: Removed {relpath}") - del state[relpath] - broadcast(format_update(old, state.root), loop) + obegin, old = treeget(rootmod, relpath) + if old == new: + logger.debug( + f"Watch: Event without changes needed {relpath}" + if old + else f"Watch: Event with old and new missing: {relpath}" + ) + return + if obegin is not None: + del rootmod[obegin : obegin + len(old)] + if new: + logger.debug(f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}") + i = treeinspos(rootmod, relpath, new[0].isfile) + rootmod[i:i] = new + else: + logger.debug(f"Watch: Removed {relpath}") def update_space(loop): @@ -231,40 +223,57 @@ def update_space(loop): def format_update(old, new): # Make keep/del/insert diff until one of the lists ends oidx, nidx = 0, 0 + oremain, nremain = set(old), set(new) update = [] keep_count = 0 while oidx < len(old) and nidx < len(new): + modified = False + # Matching entries are kept if old[oidx] == new[nidx]: + entry = old[oidx] + oremain.remove(entry) + nremain.remove(entry) keep_count += 1 oidx += 1 nidx += 1 continue if keep_count > 0: + modified = True update.append(UpdKeep(keep_count)) keep_count = 0 + # Items only in old are deleted del_count = 0 - rest = new[nidx:] - while oidx < len(old) and old[oidx] not in rest: + while oidx < len(old) and old[oidx] not in nremain: + oremain.remove(old[oidx]) del_count += 1 oidx += 1 if del_count: update.append(UpdDel(del_count)) continue + # Items only in new are inserted insert_items = [] - rest = old[oidx:] - while nidx < len(new) and new[nidx] not in rest: - insert_items.append(new[nidx]) + while nidx < len(new) and new[nidx] not in oremain: + entry = new[nidx] + nremain.remove(entry) + insert_items.append(entry) nidx += 1 - update.append(UpdIns(insert_items)) + if insert_items: + modified = True + update.append(UpdIns(insert_items)) + + if not modified: + raise Exception( + f"Infinite loop in diff {nidx=} {oidx=} {len(old)=} {len(new)=}" + ) # Diff any remaining if keep_count > 0: update.append(UpdKeep(keep_count)) - if oidx < len(old): - update.append(UpdDel(len(old) - oidx)) - elif nidx < len(new): + if oremain: + update.append(UpdDel(len(oremain))) + elif nremain: update.append(UpdIns(new[nidx:])) return msgspec.json.encode({"update": update}).decode() @@ -310,13 +319,14 @@ def watcher_inotify(loop): while not quit.is_set(): i = inotify.adapters.InotifyTree(rootpath.as_posix()) # Initialize the tree from filesystem + t0 = time.perf_counter() update_root(loop) + t1 = time.perf_counter() + logger.debug(f"Root update took {t1 - t0:.1f}s") trefresh = time.monotonic() + 30.0 tspace = time.monotonic() + 5.0 # Watch for changes (frequent wakeups needed for quiting) - for event in i.event_gen(timeout_s=0.1): - if quit.is_set(): - break + while not quit.is_set(): t = time.monotonic() # The watching is not entirely reliable, so do a full refresh every 30 seconds if t >= trefresh: @@ -325,11 +335,34 @@ def watcher_inotify(loop): if t >= tspace: tspace = time.monotonic() + 5.0 update_space(loop) - # Inotify event, update the tree - if event and any(f in modified_flags for f in event[1]): - # Update modified path - path = PurePosixPath(event[2]) / event[3] - update_path(path.relative_to(rootpath), loop) + # Inotify events, update the tree + events = list(i.event_gen(yield_nones=False, timeout_s=0.1)) + dirty = False + rootmod = list(state.root) + for event in events: + assert event + interesting = any(f in modified_flags for f in event[1]) + if event[2] == rootpath.as_posix() and event[3] == "zzz": + logger.debug(f"Watch: {interesting=} {event=}") + if interesting: + # Update modified path + t0 = time.perf_counter() + path = PurePosixPath(event[2]) / event[3] + update_path(rootmod, path.relative_to(rootpath), loop) + t1 = time.perf_counter() + logger.debug(f"Watch: Update {event[3]} took {t1 - t0:.1f}s") + dirty = True + if dirty and state.root != rootmod: + t0 = time.perf_counter() + update = format_update(state.root, rootmod) + t1 = time.perf_counter() + with state.lock: + broadcast(update, loop) + state.root = rootmod + t2 = time.perf_counter() + logger.debug( + f"Format update took {t1 - t0:.1f}s, broadcast {t2 - t1:.1f}s" + ) del i # Free the inotify object diff --git a/tests/test_watching.py b/tests/test_watching.py index 81301a7..c358469 100644 --- a/tests/test_watching.py +++ b/tests/test_watching.py @@ -1,10 +1,7 @@ -from pathlib import PurePosixPath - import msgspec -import pytest from cista.protocol import FileEntry, UpdateMessage, UpdDel, UpdIns, UpdKeep -from cista.watching import State, format_update +from cista.watching import format_update def decode(data: str): @@ -37,6 +34,14 @@ def test_insertions(): assert decode(format_update(old_list, new_list)) == expected +def test_insertion_at_end(): + old_list = [*f(3), FileEntry(1, "xxx", "xxx", 0, 0, 1)] + newfile = FileEntry(1, "yyy", "yyy", 0, 0, 1) + new_list = [*old_list, newfile] + expected = [UpdKeep(4), UpdIns([newfile])] + assert decode(format_update(old_list, new_list)) == expected + + def test_deletions(): old_list = f(3) new_list = [old_list[0], old_list[2]] @@ -83,54 +88,3 @@ def test_longer_lists(): def sortkey(name): # Define the sorting key for names here return name.lower() - - -@pytest.fixture() -def state(): - entries = [ - FileEntry(0, "", "root", 0, 0, 0), - FileEntry(1, "bar", "bar", 0, 0, 0), - FileEntry(2, "baz", "bar/baz", 0, 0, 0), - FileEntry(1, "foo", "foo", 0, 0, 0), - FileEntry(1, "xxx", "xxx", 0, 0, 0), - FileEntry(2, "yyy", "xxx/yyy", 0, 0, 1), - ] - s = State() - s._listing = entries - return s - - -def test_existing_directory(state): - path = PurePosixPath("bar") - expected_slice = slice(1, 3) # Includes 'bar' and 'baz' - assert state._slice(path) == expected_slice - - -def test_existing_file(state): - path = PurePosixPath("xxx/yyy") - expected_slice = slice(5, 6) # Only includes 'yyy' - assert state._slice(path) == expected_slice - - -def test_nonexistent_directory(state): - path = PurePosixPath("zzz") - expected_slice = slice(6, 6) # 'zzz' would be inserted at end - assert state._slice(path) == expected_slice - - -def test_nonexistent_file(state): - path = (PurePosixPath("bar/mmm"), 1) - expected_slice = slice(3, 3) # A file would be inserted after 'baz' under 'bar' - assert state._slice(path) == expected_slice - - -def test_root_directory(state): - path = PurePosixPath() - expected_slice = slice(0, 6) # Entire tree - assert state._slice(path) == expected_slice - - -def test_directory_with_subdirs_and_files(state): - path = PurePosixPath("xxx") - expected_slice = slice(4, 6) # Includes 'xxx' and 'yyy' - assert state._slice(path) == expected_slice