cista-storage/cista/watching.py

347 lines
9.8 KiB
Python
Raw Normal View History

import asyncio
import shutil
import stat
import sys
import threading
import time
from os import stat_result
from pathlib import Path, PurePosixPath
2023-10-14 23:29:50 +01:00
import msgspec
from natsort import humansorted, natsort_keygen, ns
from sanic.log import logging
2023-10-14 23:29:50 +01:00
2023-10-21 17:17:09 +01:00
from cista import config
from cista.fileio import fuid
from cista.protocol import FileEntry, Space, UpdDel, UpdIns, UpdKeep
2023-10-14 23:29:50 +01:00
pubsub = {}
sortkey = natsort_keygen(alg=ns.LOCALE)
class State:
def __init__(self):
self.lock = threading.RLock()
self._space = Space(0, 0, 0, 0)
self._listing: list[FileEntry] = []
@property
def space(self):
with self.lock:
return self._space
@space.setter
def space(self, space):
with self.lock:
self._space = space
@property
def root(self) -> list[FileEntry]:
with self.lock:
return self._listing[:]
@root.setter
def root(self, listing: list[FileEntry]):
with self.lock:
self._listing = listing
def _slice(self, idx: PurePosixPath | tuple[PurePosixPath, int]):
relpath, relfile = idx if isinstance(idx, tuple) else (idx, 0)
begin, end = 0, len(self._listing)
level = 0
isfile = 0
while level < len(relpath.parts):
# Enter a subdirectory
level += 1
begin += 1
if level == len(relpath.parts):
isfile = relfile
name = relpath.parts[level - 1]
namesort = sortkey(name)
r = self._listing[begin]
assert r.level == level
# Iterate over items at this level
while (
begin < end
and r.name != name
and r.isfile <= isfile
and sortkey(r.name) < namesort
):
# Skip contents
begin += 1
while begin < end and self._listing[begin].level > level:
begin += 1
# Not found?
if begin == end or self._listing[begin].level < level:
return slice(begin, begin)
r = self._listing[begin]
# Not found?
if begin == end or r.name != name:
return slice(begin, begin)
# Found an item, now find its end
for end in range(begin + 1, len(self._listing)):
if self._listing[end].level <= level:
break
return slice(begin, end)
def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]):
with self.lock:
print(self._slice(index))
return self._listing[self._slice(index)]
def __setitem__(
self, index: tuple[PurePosixPath, int], value: list[FileEntry]
) -> None:
rel, isfile = index
with self.lock:
if rel.parts:
parent = self._slice(rel.parent)
if parent.start == parent.stop:
raise ValueError(
f"Parent folder {rel.as_posix()} is missing for {rel.name}"
)
self._listing[self._slice(index)] = value
def __delitem__(self, relpath: PurePosixPath):
with self.lock:
del self._listing[self._slice(relpath)]
def _index(self, rel: PurePosixPath):
idx = 0
ret = []
def _dir(self, idx: int):
level = self._listing[idx].level + 1
end = len(self._listing)
idx += 1
ret = []
while idx < end and (r := self._listing[idx]).level >= level:
if r.level == level:
ret.append(idx)
return ret, idx
def update(self, rel: PurePosixPath, value: FileEntry):
begin = 0
parents = []
while self._listing[begin].level < len(rel.parts):
parents.append(begin)
state = State()
rootpath: Path = None # type: ignore
quit = False
modified_flags = (
"IN_CREATE",
"IN_DELETE",
"IN_DELETE_SELF",
"IN_MODIFY",
"IN_MOVE_SELF",
"IN_MOVED_FROM",
"IN_MOVED_TO",
)
def watcher_thread(loop):
global rootpath
import inotify.adapters
while True:
rootpath = config.config.path
i = inotify.adapters.InotifyTree(rootpath.as_posix())
# Initialize the tree from filesystem
old, new = state.root, walk()
if old != new:
with state.lock:
state.root = new
msg = format_tree(new)
asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result()
# The watching is not entirely reliable, so do a full refresh every minute
refreshdl = time.monotonic() + 60.0
for event in i.event_gen():
if quit:
return
# Disk usage update
du = shutil.disk_usage(rootpath)
space = Space(*du, storage=state.root[0].size)
if space != state.space:
state.space = space
asyncio.run_coroutine_threadsafe(
broadcast(format_du(space)), loop
).result()
break
# Do a full refresh?
if time.monotonic() > refreshdl:
break
if event is None:
continue
_, flags, path, filename = event
if not any(f in modified_flags for f in flags):
continue
# Update modified path
path = PurePosixPath(path) / filename
try:
update(path.relative_to(rootpath), loop)
except Exception as e:
print("Watching error", e, path, rootpath)
raise
i = None # Free the inotify object
2023-10-14 23:29:50 +01:00
def watcher_thread_poll(loop):
global rootpath
while not quit:
rootpath = config.config.path
old = state.root
new = walk()
if old != new:
state.root = new
asyncio.run_coroutine_threadsafe(broadcast(format_tree(new)), loop).result()
# Disk usage update
du = shutil.disk_usage(rootpath)
space = Space(*du, storage=state.root[0].size)
if space != state.space:
state.space = space
asyncio.run_coroutine_threadsafe(broadcast(format_du(space)), loop).result()
time.sleep(2.0)
def format_du(usage):
return msgspec.json.encode({"space": usage}).decode()
def format_tree(root):
return msgspec.json.encode({"root": root}).decode()
def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008
path = rootpath / rel
2023-10-14 23:29:50 +01:00
try:
st = path.stat()
except OSError:
return []
return _walk(rel, int(not stat.S_ISDIR(st.st_mode)), st)
def _walk(rel: PurePosixPath, isfile: int, st: stat_result) -> list[FileEntry]:
entry = FileEntry(
level=len(rel.parts),
name=rel.name,
key=fuid(st),
mtime=int(st.st_mtime),
size=st.st_size if isfile else 0,
isfile=isfile,
)
if isfile:
return [entry]
ret = [entry]
path = rootpath / rel
try:
li = []
for f in path.iterdir():
if f.name.startswith("."):
continue # No dotfiles
s = f.stat()
li.append((int(not stat.S_ISDIR(s.st_mode)), f.name, s))
for [isfile, name, s] in humansorted(li):
subtree = _walk(rel / name, isfile, s)
child = subtree[0]
entry.mtime = max(entry.mtime, child.mtime)
entry.size += child.size
ret.extend(subtree)
except FileNotFoundError:
pass # Things may be rapidly in motion
2023-10-14 23:29:50 +01:00
except OSError as e:
print("OS error walking path", path, e)
return ret
2023-10-14 23:29:50 +01:00
def update(relpath: PurePosixPath, loop):
"""Called by inotify updates, check the filesystem and broadcast any changes."""
if rootpath is None or relpath is None:
print("ERROR", rootpath, relpath)
new = walk(relpath)
old = state[relpath]
2023-10-14 23:29:50 +01:00
if old == new:
return
old = state.root
if new:
state[relpath, new[0].isfile] = new
else:
del state[relpath]
# FIXME: broadcast format_update()
msg = format_update(old, state.root)
asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result()
def format_update(old, new):
# Make keep/del/insert diff until one of the lists ends
oidx, nidx = 0, 0
update = []
keep_count = 0
while oidx < len(old) and nidx < len(new):
if old[oidx] == new[nidx]:
keep_count += 1
oidx += 1
nidx += 1
continue
if keep_count > 0:
update.append(UpdKeep(keep_count))
keep_count = 0
del_count = 0
rest = new[nidx:]
while old[oidx] not in rest:
del_count += 1
oidx += 1
if del_count:
update.append(UpdDel(del_count))
oidx += 1
continue
insert_items = []
rest = old[oidx:]
while nidx < len(new) and new[nidx] not in rest:
insert_items.append(new[nidx])
nidx += 1
update.append(UpdIns(insert_items))
# Diff any remaining
if keep_count > 0:
update.append(UpdKeep(keep_count))
if oidx < len(old):
update.append(UpdDel(len(old) - oidx))
elif nidx < len(new):
update.append(UpdIns(new[nidx:]))
return msgspec.json.encode({"update": update}).decode()
async def broadcast(msg):
try:
for queue in pubsub.values():
queue.put_nowait(msg)
except Exception:
# Log because asyncio would silently eat the error
logging.exception("Broadcast error")
async def start(app, loop):
config.load_config()
app.ctx.watcher = threading.Thread(
target=watcher_thread if sys.platform == "linux" else watcher_thread_poll,
args=[loop],
)
app.ctx.watcher.start()
async def stop(app, loop):
global quit
quit = True
app.ctx.watcher.join()