Rewritten inotify watching and faster full tree traversal (also for polling worker). Bugs in inotify watcher fixed.

This commit is contained in:
Leo Vasanko 2023-11-20 18:49:24 +00:00
parent 667e31aa08
commit dbb06e111c
3 changed files with 157 additions and 170 deletions

View File

@ -112,7 +112,7 @@ class ErrorMsg(msgspec.Struct):
## Directory listings ## Directory listings
class FileEntry(msgspec.Struct, array_like=True): class FileEntry(msgspec.Struct, array_like=True, frozen=True):
level: int level: int
name: str name: str
key: str key: str

View File

@ -46,87 +46,70 @@ class State:
with self.lock: with self.lock:
self._listing = listing self._listing = listing
def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]):
relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0) def treeiter(rootmod):
begin, end = 0, len(self._listing) relpath = PurePosixPath()
level = 0 for i, entry in enumerate(rootmod):
if entry.level > 0:
relpath = PurePosixPath(*relpath.parts[: entry.level - 1]) / entry.name
yield i, relpath, entry
def treeget(rootmod: list[FileEntry], path: PurePosixPath):
begin = None
ret = []
for i, relpath, entry in treeiter(rootmod):
if begin is None:
if relpath == path:
begin = i
ret.append(entry)
continue
if entry.level <= len(path.parts):
break
ret.append(entry)
return begin, ret
def treeinspos(rootmod: list[FileEntry], relpath: PurePosixPath, relfile: int):
# Find the first entry greater than the new one
# precondition: the new entry doesn't exist
isfile = 0 isfile = 0
level = 0
# Special case for root i = 0
if not relpath.parts: for i, rel, entry in treeiter(rootmod):
return slice(begin, end)
for part in relpath.parts:
level += 1
found = False
# Traverse contents of a folder at level
while begin + 1 < end:
begin += 1
entry = self._listing[begin]
# Skip subentries
if entry.level > level: if entry.level > level:
# We haven't found item at level, skip subdirectories
continue continue
# The directory we were traversing has ended
if entry.level < level: if entry.level < level:
break # We have passed the level, so the new item is the first
return i
# Found the requested part? if level == 0:
if entry.name == part: # root
found = True level += 1
# Are we finished yet?
if level == len(relpath.parts):
isfile = relfile
else:
# Enter subdirectory
continue continue
break ename = rel.parts[level - 1]
name = relpath.parts[level - 1]
# Checking if past the requested spot in sort order esort = sortkey(ename)
cmp = entry.isfile - isfile or sortkey(entry.name) > sortkey(part) nsort = sortkey(name)
# Non-leaf are always folders, only use relfile at leaf
isfile = relfile if len(relpath.parts) == level else 0
# First compare by isfile, then by sorting order and if that too matches then case sensitive
cmp = (
entry.isfile - isfile
or (esort > nsort) - (esort < nsort)
or (ename > name) - (ename < name)
)
if cmp > 0: if cmp > 0:
return i
if cmp < 0:
continue
level += 1
if level > len(relpath.parts):
print("ERROR: insertpos", relpath, i, entry.name, entry.level, level)
break break
else:
if not found: i += 1
if level < len(relpath.parts) - 1: return i
# Fail if the parent folder is missing
parent = "/".join(relpath.parts[:level])
raise ValueError(
f"Parent folder {parent} is missing for {relpath.as_posix()} at {level=}"
)
# Insertion point at the end of the directory
return slice(begin, begin)
# Otherwise found=True, continue to the next part if not there yet
# Found the starting point, now find the end of the slice
for end in range(begin + 1, len(self._listing) + 1):
if end == len(self._listing) or self._listing[end].level <= level:
break
return slice(begin, end)
def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]):
with self.lock:
return self._listing[self._slice(index)]
def __setitem__(
self, index: tuple[PurePosixPath, int], value: list[FileEntry]
) -> None:
rel, isfile = index
with self.lock:
if rel.parts:
parent = self._slice(rel.parent)
if parent.start == parent.stop:
raise ValueError(
f"Parent folder {rel.as_posix()} is missing for {rel.name} {level=}"
)
self._listing[self._slice(index)] = value
def __delitem__(self, relpath: PurePosixPath):
with self.lock:
del self._listing[self._slice(relpath)]
state = State() state = State()
@ -141,7 +124,7 @@ def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry]
ret = [] ret = []
try: try:
st = stat or path.stat() st = stat or path.stat()
isfile = not S_ISDIR(st.st_mode) isfile = int(not S_ISDIR(st.st_mode))
entry = FileEntry( entry = FileEntry(
level=len(rel.parts), level=len(rel.parts),
name=rel.name, name=rel.name,
@ -170,10 +153,17 @@ def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry]
# Build the tree as a list of FileEntries # Build the tree as a list of FileEntries
for [_, name, s] in humansorted(li): for [_, name, s] in humansorted(li):
sub = walk(rel / name, stat=s) sub = walk(rel / name, stat=s)
ret.extend(sub)
child = sub[0] child = sub[0]
entry.mtime = max(entry.mtime, child.mtime) entry = FileEntry(
entry.size += child.size entry.level,
entry.name,
entry.key,
entry.size + child.size,
max(entry.mtime, child.mtime),
entry.isfile,
)
ret[0] = entry
ret.extend(sub)
except FileNotFoundError: except FileNotFoundError:
pass # Things may be rapidly in motion pass # Things may be rapidly in motion
except OSError as e: except OSError as e:
@ -193,23 +183,25 @@ def update_root(loop):
broadcast(format_update(old, new), loop) broadcast(format_update(old, new), loop)
def update_path(relpath: PurePosixPath, loop): def update_path(rootmod: list[FileEntry], relpath: PurePosixPath, loop):
"""Called on FS updates, check the filesystem and broadcast any changes.""" """Called on FS updates, check the filesystem and broadcast any changes."""
new = walk(relpath) new = walk(relpath)
with state.lock: obegin, old = treeget(rootmod, relpath)
old = state[relpath]
if old == new: if old == new:
return
old = state.root
if new:
logger.debug( logger.debug(
f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}" f"Watch: Event without changes needed {relpath}"
if old
else f"Watch: Event with old and new missing: {relpath}"
) )
state[relpath, new[0].isfile] = new return
if obegin is not None:
del rootmod[obegin : obegin + len(old)]
if new:
logger.debug(f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}")
i = treeinspos(rootmod, relpath, new[0].isfile)
rootmod[i:i] = new
else: else:
logger.debug(f"Watch: Removed {relpath}") logger.debug(f"Watch: Removed {relpath}")
del state[relpath]
broadcast(format_update(old, state.root), loop)
def update_space(loop): def update_space(loop):
@ -231,40 +223,57 @@ def update_space(loop):
def format_update(old, new): def format_update(old, new):
# Make keep/del/insert diff until one of the lists ends # Make keep/del/insert diff until one of the lists ends
oidx, nidx = 0, 0 oidx, nidx = 0, 0
oremain, nremain = set(old), set(new)
update = [] update = []
keep_count = 0 keep_count = 0
while oidx < len(old) and nidx < len(new): while oidx < len(old) and nidx < len(new):
modified = False
# Matching entries are kept
if old[oidx] == new[nidx]: if old[oidx] == new[nidx]:
entry = old[oidx]
oremain.remove(entry)
nremain.remove(entry)
keep_count += 1 keep_count += 1
oidx += 1 oidx += 1
nidx += 1 nidx += 1
continue continue
if keep_count > 0: if keep_count > 0:
modified = True
update.append(UpdKeep(keep_count)) update.append(UpdKeep(keep_count))
keep_count = 0 keep_count = 0
# Items only in old are deleted
del_count = 0 del_count = 0
rest = new[nidx:] while oidx < len(old) and old[oidx] not in nremain:
while oidx < len(old) and old[oidx] not in rest: oremain.remove(old[oidx])
del_count += 1 del_count += 1
oidx += 1 oidx += 1
if del_count: if del_count:
update.append(UpdDel(del_count)) update.append(UpdDel(del_count))
continue continue
# Items only in new are inserted
insert_items = [] insert_items = []
rest = old[oidx:] while nidx < len(new) and new[nidx] not in oremain:
while nidx < len(new) and new[nidx] not in rest: entry = new[nidx]
insert_items.append(new[nidx]) nremain.remove(entry)
insert_items.append(entry)
nidx += 1 nidx += 1
if insert_items:
modified = True
update.append(UpdIns(insert_items)) update.append(UpdIns(insert_items))
if not modified:
raise Exception(
f"Infinite loop in diff {nidx=} {oidx=} {len(old)=} {len(new)=}"
)
# Diff any remaining # Diff any remaining
if keep_count > 0: if keep_count > 0:
update.append(UpdKeep(keep_count)) update.append(UpdKeep(keep_count))
if oidx < len(old): if oremain:
update.append(UpdDel(len(old) - oidx)) update.append(UpdDel(len(oremain)))
elif nidx < len(new): elif nremain:
update.append(UpdIns(new[nidx:])) update.append(UpdIns(new[nidx:]))
return msgspec.json.encode({"update": update}).decode() return msgspec.json.encode({"update": update}).decode()
@ -310,13 +319,14 @@ def watcher_inotify(loop):
while not quit.is_set(): while not quit.is_set():
i = inotify.adapters.InotifyTree(rootpath.as_posix()) i = inotify.adapters.InotifyTree(rootpath.as_posix())
# Initialize the tree from filesystem # Initialize the tree from filesystem
t0 = time.perf_counter()
update_root(loop) update_root(loop)
t1 = time.perf_counter()
logger.debug(f"Root update took {t1 - t0:.1f}s")
trefresh = time.monotonic() + 30.0 trefresh = time.monotonic() + 30.0
tspace = time.monotonic() + 5.0 tspace = time.monotonic() + 5.0
# Watch for changes (frequent wakeups needed for quiting) # Watch for changes (frequent wakeups needed for quiting)
for event in i.event_gen(timeout_s=0.1): while not quit.is_set():
if quit.is_set():
break
t = time.monotonic() t = time.monotonic()
# The watching is not entirely reliable, so do a full refresh every 30 seconds # The watching is not entirely reliable, so do a full refresh every 30 seconds
if t >= trefresh: if t >= trefresh:
@ -325,11 +335,34 @@ def watcher_inotify(loop):
if t >= tspace: if t >= tspace:
tspace = time.monotonic() + 5.0 tspace = time.monotonic() + 5.0
update_space(loop) update_space(loop)
# Inotify event, update the tree # Inotify events, update the tree
if event and any(f in modified_flags for f in event[1]): events = list(i.event_gen(yield_nones=False, timeout_s=0.1))
dirty = False
rootmod = list(state.root)
for event in events:
assert event
interesting = any(f in modified_flags for f in event[1])
if event[2] == rootpath.as_posix() and event[3] == "zzz":
logger.debug(f"Watch: {interesting=} {event=}")
if interesting:
# Update modified path # Update modified path
t0 = time.perf_counter()
path = PurePosixPath(event[2]) / event[3] path = PurePosixPath(event[2]) / event[3]
update_path(path.relative_to(rootpath), loop) update_path(rootmod, path.relative_to(rootpath), loop)
t1 = time.perf_counter()
logger.debug(f"Watch: Update {event[3]} took {t1 - t0:.1f}s")
dirty = True
if dirty and state.root != rootmod:
t0 = time.perf_counter()
update = format_update(state.root, rootmod)
t1 = time.perf_counter()
with state.lock:
broadcast(update, loop)
state.root = rootmod
t2 = time.perf_counter()
logger.debug(
f"Format update took {t1 - t0:.1f}s, broadcast {t2 - t1:.1f}s"
)
del i # Free the inotify object del i # Free the inotify object

View File

@ -1,10 +1,7 @@
from pathlib import PurePosixPath
import msgspec import msgspec
import pytest
from cista.protocol import FileEntry, UpdateMessage, UpdDel, UpdIns, UpdKeep from cista.protocol import FileEntry, UpdateMessage, UpdDel, UpdIns, UpdKeep
from cista.watching import State, format_update from cista.watching import format_update
def decode(data: str): def decode(data: str):
@ -37,6 +34,14 @@ def test_insertions():
assert decode(format_update(old_list, new_list)) == expected assert decode(format_update(old_list, new_list)) == expected
def test_insertion_at_end():
old_list = [*f(3), FileEntry(1, "xxx", "xxx", 0, 0, 1)]
newfile = FileEntry(1, "yyy", "yyy", 0, 0, 1)
new_list = [*old_list, newfile]
expected = [UpdKeep(4), UpdIns([newfile])]
assert decode(format_update(old_list, new_list)) == expected
def test_deletions(): def test_deletions():
old_list = f(3) old_list = f(3)
new_list = [old_list[0], old_list[2]] new_list = [old_list[0], old_list[2]]
@ -83,54 +88,3 @@ def test_longer_lists():
def sortkey(name): def sortkey(name):
# Define the sorting key for names here # Define the sorting key for names here
return name.lower() return name.lower()
@pytest.fixture()
def state():
entries = [
FileEntry(0, "", "root", 0, 0, 0),
FileEntry(1, "bar", "bar", 0, 0, 0),
FileEntry(2, "baz", "bar/baz", 0, 0, 0),
FileEntry(1, "foo", "foo", 0, 0, 0),
FileEntry(1, "xxx", "xxx", 0, 0, 0),
FileEntry(2, "yyy", "xxx/yyy", 0, 0, 1),
]
s = State()
s._listing = entries
return s
def test_existing_directory(state):
path = PurePosixPath("bar")
expected_slice = slice(1, 3) # Includes 'bar' and 'baz'
assert state._slice(path) == expected_slice
def test_existing_file(state):
path = PurePosixPath("xxx/yyy")
expected_slice = slice(5, 6) # Only includes 'yyy'
assert state._slice(path) == expected_slice
def test_nonexistent_directory(state):
path = PurePosixPath("zzz")
expected_slice = slice(6, 6) # 'zzz' would be inserted at end
assert state._slice(path) == expected_slice
def test_nonexistent_file(state):
path = (PurePosixPath("bar/mmm"), 1)
expected_slice = slice(3, 3) # A file would be inserted after 'baz' under 'bar'
assert state._slice(path) == expected_slice
def test_root_directory(state):
path = PurePosixPath()
expected_slice = slice(0, 6) # Entire tree
assert state._slice(path) == expected_slice
def test_directory_with_subdirs_and_files(state):
path = PurePosixPath("xxx")
expected_slice = slice(4, 6) # Includes 'xxx' and 'yyy'
assert state._slice(path) == expected_slice