cista-storage/cista/watching.py

366 lines
11 KiB
Python
Raw Normal View History

import asyncio
import shutil
import sys
import threading
import time
2023-11-16 16:24:45 +00:00
from contextlib import suppress
from os import stat_result
from pathlib import Path, PurePosixPath
from stat import S_ISDIR, S_ISREG
2023-10-14 23:29:50 +01:00
import msgspec
from natsort import humansorted, natsort_keygen, ns
from sanic.log import logger
2023-10-14 23:29:50 +01:00
2023-10-21 17:17:09 +01:00
from cista import config
from cista.fileio import fuid
from cista.protocol import FileEntry, Space, UpdDel, UpdIns, UpdKeep
2023-10-14 23:29:50 +01:00
pubsub = {}
sortkey = natsort_keygen(alg=ns.LOCALE)
class State:
def __init__(self):
self.lock = threading.RLock()
self._space = Space(0, 0, 0, 0)
self._listing: list[FileEntry] = []
@property
def space(self):
with self.lock:
return self._space
@space.setter
def space(self, space):
with self.lock:
self._space = space
@property
def root(self) -> list[FileEntry]:
with self.lock:
return self._listing[:]
@root.setter
def root(self, listing: list[FileEntry]):
with self.lock:
self._listing = listing
def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]):
relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0)
begin, end = 0, len(self._listing)
level = 0
isfile = 0
# Special case for root
if not relpath.parts:
return slice(begin, end)
for part in relpath.parts:
level += 1
found = False
# Traverse contents of a folder at level
while begin + 1 < end:
begin += 1
entry = self._listing[begin]
# Skip subentries
if entry.level > level:
continue
# The directory we were traversing has ended
if entry.level < level:
break
# Found the requested part?
if entry.name == part:
found = True
# Are we finished yet?
if level == len(relpath.parts):
isfile = relfile
else:
# Enter subdirectory
continue
break
# Checking if past the requested spot in sort order
cmp = entry.isfile - isfile or sortkey(entry.name) > sortkey(part)
if cmp > 0:
break
if not found:
if level < len(relpath.parts) - 1:
# Fail if the parent folder is missing
parent = "/".join(relpath.parts[:level])
raise ValueError(
f"Parent folder {parent} is missing for {relpath.as_posix()} at {level=}"
)
# Insertion point at the end of the directory
return slice(begin, begin)
# Otherwise found=True, continue to the next part if not there yet
# Found the starting point, now find the end of the slice
for end in range(begin + 1, len(self._listing) + 1):
if end == len(self._listing) or self._listing[end].level <= level:
break
return slice(begin, end)
def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]):
with self.lock:
return self._listing[self._slice(index)]
def __setitem__(
self, index: tuple[PurePosixPath, int], value: list[FileEntry]
) -> None:
rel, isfile = index
with self.lock:
if rel.parts:
parent = self._slice(rel.parent)
if parent.start == parent.stop:
raise ValueError(
f"Parent folder {rel.as_posix()} is missing for {rel.name} {level=}"
)
self._listing[self._slice(index)] = value
def __delitem__(self, relpath: PurePosixPath):
with self.lock:
del self._listing[self._slice(relpath)]
state = State()
rootpath: Path = None # type: ignore
quit = threading.Event()
2023-11-16 16:24:45 +00:00
## Filesystem scanning
2023-11-16 16:24:45 +00:00
def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry]:
path = rootpath / rel
2023-11-16 16:24:45 +00:00
ret = []
2023-10-14 23:29:50 +01:00
try:
2023-11-16 16:24:45 +00:00
st = stat or path.stat()
isfile = not S_ISDIR(st.st_mode)
entry = FileEntry(
level=len(rel.parts),
name=rel.name,
key=fuid(st),
mtime=int(st.st_mtime),
size=st.st_size if isfile else 0,
isfile=isfile,
)
if isfile:
return [entry]
# Walk all entries of the directory
ret = [entry]
li = []
for f in path.iterdir():
if quit.is_set():
raise SystemExit("quit")
if f.name.startswith("."):
continue # No dotfiles
2023-11-16 16:24:45 +00:00
with suppress(FileNotFoundError):
s = f.lstat()
isfile = S_ISREG(s.st_mode)
isdir = S_ISDIR(s.st_mode)
if not isfile and not isdir:
continue
li.append((int(isfile), f.name, s))
2023-11-16 16:24:45 +00:00
# Build the tree as a list of FileEntries
for [_, name, s] in humansorted(li):
sub = walk(rel / name, stat=s)
ret.extend(sub)
child = sub[0]
entry.mtime = max(entry.mtime, child.mtime)
entry.size += child.size
except FileNotFoundError:
pass # Things may be rapidly in motion
except OSError as e:
if e.errno == 13: # Permission denied
pass
2023-11-16 16:24:45 +00:00
logger.error(f"Watching {path=}: {e!r}")
return ret
2023-10-14 23:29:50 +01:00
2023-11-16 16:24:45 +00:00
def update_root(loop):
"""Full filesystem scan"""
new = walk(PurePosixPath())
with state.lock:
old = state.root
if old != new:
state.root = new
broadcast(format_update(old, new), loop)
def update_path(relpath: PurePosixPath, loop):
"""Called on FS updates, check the filesystem and broadcast any changes."""
new = walk(relpath)
with state.lock:
old = state[relpath]
if old == new:
return
old = state.root
if new:
logger.debug(
f"Watch: Update {relpath}" if old else f"Watch: Created {relpath}"
)
state[relpath, new[0].isfile] = new
else:
logger.debug(f"Watch: Removed {relpath}")
del state[relpath]
broadcast(format_update(old, state.root), loop)
2023-11-16 16:24:45 +00:00
def update_space(loop):
"""Called periodically to update the disk usage."""
du = shutil.disk_usage(rootpath)
space = Space(*du, storage=state.root[0].size)
# Update only on difference above 1 MB
tol = 10**6
old = msgspec.structs.astuple(state.space)
new = msgspec.structs.astuple(space)
if any(abs(o - n) > tol for o, n in zip(old, new, strict=True)):
state.space = space
broadcast(format_space(space), loop)
## Messaging
def format_update(old, new):
# Make keep/del/insert diff until one of the lists ends
oidx, nidx = 0, 0
update = []
keep_count = 0
while oidx < len(old) and nidx < len(new):
if old[oidx] == new[nidx]:
keep_count += 1
oidx += 1
nidx += 1
continue
if keep_count > 0:
update.append(UpdKeep(keep_count))
keep_count = 0
del_count = 0
rest = new[nidx:]
while oidx < len(old) and old[oidx] not in rest:
del_count += 1
oidx += 1
if del_count:
update.append(UpdDel(del_count))
continue
insert_items = []
rest = old[oidx:]
while nidx < len(new) and new[nidx] not in rest:
insert_items.append(new[nidx])
nidx += 1
update.append(UpdIns(insert_items))
# Diff any remaining
if keep_count > 0:
update.append(UpdKeep(keep_count))
if oidx < len(old):
update.append(UpdDel(len(old) - oidx))
elif nidx < len(new):
update.append(UpdIns(new[nidx:]))
return msgspec.json.encode({"update": update}).decode()
def format_space(usage):
return msgspec.json.encode({"space": usage}).decode()
def format_root(root):
return msgspec.json.encode({"root": root}).decode()
def broadcast(msg, loop):
return asyncio.run_coroutine_threadsafe(abroadcast(msg), loop).result()
async def abroadcast(msg):
try:
for queue in pubsub.values():
queue.put_nowait(msg)
except Exception:
# Log because asyncio would silently eat the error
logger.exception("Broadcast error")
2023-11-16 16:24:45 +00:00
## Watcher thread
def watcher_inotify(loop):
"""Inotify watcher thread (Linux only)"""
import inotify.adapters
modified_flags = (
"IN_CREATE",
"IN_DELETE",
"IN_DELETE_SELF",
"IN_MODIFY",
"IN_MOVE_SELF",
"IN_MOVED_FROM",
"IN_MOVED_TO",
)
while not quit.is_set():
i = inotify.adapters.InotifyTree(rootpath.as_posix())
# Initialize the tree from filesystem
update_root(loop)
trefresh = time.monotonic() + 30.0
tspace = time.monotonic() + 5.0
# Watch for changes (frequent wakeups needed for quiting)
for event in i.event_gen(timeout_s=0.1):
if quit.is_set():
break
t = time.monotonic()
# The watching is not entirely reliable, so do a full refresh every 30 seconds
if t >= trefresh:
break
# Disk usage update
if t >= tspace:
tspace = time.monotonic() + 5.0
update_space(loop)
# Inotify event, update the tree
if event and any(f in modified_flags for f in event[1]):
# Update modified path
path = PurePosixPath(event[2]) / event[3]
update_path(path.relative_to(rootpath), loop)
2023-11-16 16:24:45 +00:00
del i # Free the inotify object
def watcher_poll(loop):
"""Polling version of the watcher thread."""
while not quit.is_set():
t0 = time.perf_counter()
2023-11-16 16:24:45 +00:00
update_root(loop)
update_space(loop)
dur = time.perf_counter() - t0
if dur > 1.0:
logger.debug(f"Reading the full file list took {dur:.1f}s")
quit.wait(0.1 + 8 * dur)
2023-11-16 16:24:45 +00:00
async def start(app, loop):
2023-11-16 16:24:45 +00:00
global rootpath
config.load_config()
2023-11-16 16:24:45 +00:00
rootpath = config.config.path
use_inotify = sys.platform == "linux"
app.ctx.watcher = threading.Thread(
2023-11-16 16:24:45 +00:00
target=watcher_inotify if use_inotify else watcher_poll,
args=[loop],
2023-11-16 16:24:45 +00:00
# Descriptive name for system monitoring
name=f"cista-watcher {rootpath}",
)
app.ctx.watcher.start()
async def stop(app, loop):
quit.set()
app.ctx.watcher.join()