Watching cleanup

This commit is contained in:
Leo Vasanko 2023-11-16 08:24:45 -08:00
parent b6b387d09b
commit 5285cb2fb5

View File

@ -1,11 +1,12 @@
import asyncio import asyncio
import shutil import shutil
import stat
import sys import sys
import threading import threading
import time import time
from contextlib import suppress
from os import stat_result from os import stat_result
from pathlib import Path, PurePosixPath from pathlib import Path, PurePosixPath
from stat import S_ISDIR
import msgspec import msgspec
from natsort import humansorted, natsort_keygen, ns from natsort import humansorted, natsort_keygen, ns
@ -115,135 +116,62 @@ state = State()
rootpath: Path = None # type: ignore rootpath: Path = None # type: ignore
quit = threading.Event() quit = threading.Event()
modified_flags = ( ## Filesystem scanning
"IN_CREATE",
"IN_DELETE",
"IN_DELETE_SELF",
"IN_MODIFY",
"IN_MOVE_SELF",
"IN_MOVED_FROM",
"IN_MOVED_TO",
)
def watcher_thread(loop): def walk(rel: PurePosixPath, stat: stat_result | None = None) -> list[FileEntry]:
global rootpath
import inotify.adapters
while not quit.is_set():
rootpath = config.config.path
i = inotify.adapters.InotifyTree(rootpath.as_posix())
# Initialize the tree from filesystem
new = walk()
with state.lock:
old = state.root
if old != new:
state.root = new
broadcast(format_update(old, new), loop)
# The watching is not entirely reliable, so do a full refresh every 30 seconds
refreshdl = time.monotonic() + 30.0
for event in i.event_gen():
if quit.is_set():
return
# Disk usage update
du = shutil.disk_usage(rootpath)
space = Space(*du, storage=state.root[0].size)
if space != state.space:
state.space = space
broadcast(format_space(space), loop)
break
# Do a full refresh?
if time.monotonic() > refreshdl:
break
if event is None:
continue
_, flags, path, filename = event
if not any(f in modified_flags for f in flags):
continue
# Update modified path
path = PurePosixPath(path) / filename
try:
update(path.relative_to(rootpath), loop)
except Exception as e:
print("Watching error", e, path, rootpath)
raise
i = None # Free the inotify object
def watcher_thread_poll(loop):
global rootpath
while not quit.is_set():
rootpath = config.config.path
new = walk()
with state.lock:
old = state.root
if old != new:
state.root = new
broadcast(format_update(old, new), loop)
# Disk usage update
du = shutil.disk_usage(rootpath)
space = Space(*du, storage=state.root[0].size)
if space != state.space:
state.space = space
broadcast(format_space(space), loop)
quit.wait(2.0)
def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008
path = rootpath / rel
try:
st = path.stat()
except OSError:
return []
return _walk(rel, int(not stat.S_ISDIR(st.st_mode)), st)
def _walk(rel: PurePosixPath, isfile: int, st: stat_result) -> list[FileEntry]:
entry = FileEntry(
level=len(rel.parts),
name=rel.name,
key=fuid(st),
mtime=int(st.st_mtime),
size=st.st_size if isfile else 0,
isfile=isfile,
)
if isfile:
return [entry]
ret = [entry]
path = rootpath / rel path = rootpath / rel
ret = []
try: try:
st = stat or path.stat()
isfile = not S_ISDIR(st.st_mode)
entry = FileEntry(
level=len(rel.parts),
name=rel.name,
key=fuid(st),
mtime=int(st.st_mtime),
size=st.st_size if isfile else 0,
isfile=isfile,
)
if isfile:
return [entry]
# Walk all entries of the directory
ret = [entry]
li = [] li = []
for f in path.iterdir(): for f in path.iterdir():
if quit.is_set(): if quit.is_set():
raise SystemExit("quit") raise SystemExit("quit")
if f.name.startswith("."): if f.name.startswith("."):
continue # No dotfiles continue # No dotfiles
s = f.stat() with suppress(FileNotFoundError):
li.append((int(not stat.S_ISDIR(s.st_mode)), f.name, s)) s = f.stat()
for [isfile, name, s] in humansorted(li): li.append((int(not S_ISDIR(s.st_mode)), f.name, s))
if quit.is_set(): # Build the tree as a list of FileEntries
raise SystemExit("quit") for [_, name, s] in humansorted(li):
subtree = _walk(rel / name, isfile, s) sub = walk(rel / name, stat=s)
child = subtree[0] ret.extend(sub)
child = sub[0]
entry.mtime = max(entry.mtime, child.mtime) entry.mtime = max(entry.mtime, child.mtime)
entry.size += child.size entry.size += child.size
ret.extend(subtree)
except FileNotFoundError: except FileNotFoundError:
pass # Things may be rapidly in motion pass # Things may be rapidly in motion
except OSError as e: except OSError:
print("OS error walking path", path, e) logger.error(f"Watching {path=}: {e!r}")
return ret return ret
def update(relpath: PurePosixPath, loop): def update_root(loop):
"""Called by inotify updates, check the filesystem and broadcast any changes.""" """Full filesystem scan"""
if rootpath is None or relpath is None: new = walk(PurePosixPath())
print("ERROR", rootpath, relpath) with state.lock:
old = state.root
if old != new:
state.root = new
broadcast(format_update(old, new), loop)
def update_path(relpath: PurePosixPath, loop):
"""Called on FS updates, check the filesystem and broadcast any changes."""
new = walk(relpath) new = walk(relpath)
with state.lock: with state.lock:
old = state[relpath] old = state[relpath]
@ -257,6 +185,22 @@ def update(relpath: PurePosixPath, loop):
broadcast(format_update(old, state.root), loop) broadcast(format_update(old, state.root), loop)
def update_space(loop):
"""Called periodically to update the disk usage."""
du = shutil.disk_usage(rootpath)
space = Space(*du, storage=state.root[0].size)
# Update only on difference above 1 MB
tol = 10**6
old = msgspec.structs.astuple(state.space)
new = msgspec.structs.astuple(space)
if any(abs(o - n) > tol for o, n in zip(old, new, strict=True)):
state.space = space
broadcast(format_space(space), loop)
## Messaging
def format_update(old, new): def format_update(old, new):
# Make keep/del/insert diff until one of the lists ends # Make keep/del/insert diff until one of the lists ends
oidx, nidx = 0, 0 oidx, nidx = 0, 0
@ -320,18 +264,70 @@ async def abroadcast(msg):
logger.exception("Broadcast error") logger.exception("Broadcast error")
## Watcher thread
def watcher_inotify(loop):
"""Inotify watcher thread (Linux only)"""
import inotify.adapters
modified_flags = (
"IN_CREATE",
"IN_DELETE",
"IN_DELETE_SELF",
"IN_MODIFY",
"IN_MOVE_SELF",
"IN_MOVED_FROM",
"IN_MOVED_TO",
)
while not quit.is_set():
i = inotify.adapters.InotifyTree(rootpath.as_posix())
# Initialize the tree from filesystem
update_root(loop)
trefresh = time.monotonic() + 30.0
tspace = time.monotonic() + 5.0
# Watch for changes (frequent wakeups needed for quiting)
for event in i.event_gen(timeout_s=0.1):
if quit.is_set():
break
t = time.monotonic()
# The watching is not entirely reliable, so do a full refresh every 30 seconds
if t >= trefresh:
break
# Disk usage update
if t >= tspace:
tspace = time.monotonic() + 5.0
update_space(loop)
# Inotify event, update the tree
if event and any(f in modified_flags for f in event[1]):
# Update modified path
update_path(PurePosixPath(event[2]) / event[3], loop)
del i # Free the inotify object
def watcher_poll(loop):
"""Polling version of the watcher thread."""
while not quit.is_set():
update_root(loop)
update_space(loop)
quit.wait(2.0)
async def start(app, loop): async def start(app, loop):
global rootpath
config.load_config() config.load_config()
rootpath = config.config.path
use_inotify = sys.platform == "linux" use_inotify = sys.platform == "linux"
app.ctx.watcher = threading.Thread( app.ctx.watcher = threading.Thread(
target=watcher_thread if use_inotify else watcher_thread_poll, target=watcher_inotify if use_inotify else watcher_poll,
args=[loop], args=[loop],
name="watcher", # Descriptive name for system monitoring
name=f"cista-watcher {rootpath}",
) )
app.ctx.watcher.start() app.ctx.watcher.start()
async def stop(app, loop): async def stop(app, loop):
global quit
quit.set() quit.set()
app.ctx.watcher.join() app.ctx.watcher.join()