From 63bbe84859863b5a654c4d1ec8bd84342e3b20f6 Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Fri, 27 Oct 2023 07:51:51 +0300 Subject: [PATCH] Provide file/dir id from server. The value will stay the same if the file is renamed/moved, but will change if a file is replaced by another with the same name. --- cista/protocol.py | 40 ++++++++++++++++++----- cista/watching.py | 83 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 93 insertions(+), 30 deletions(-) diff --git a/cista/protocol.py b/cista/protocol.py index 1964e06..9ab259b 100755 --- a/cista/protocol.py +++ b/cista/protocol.py @@ -11,19 +11,24 @@ from cista.util import filename ## Control commands + class ControlBase(msgspec.Struct, tag_field="op", tag=str.lower): def __call__(self): raise NotImplementedError + class MkDir(ControlBase): path: str + def __call__(self): path = config.config.path / filename.sanitize(self.path) path.mkdir(parents=False, exist_ok=False) + class Rename(ControlBase): path: str to: str + def __call__(self): to = filename.sanitize(self.to) if "/" in to: @@ -31,17 +36,21 @@ class Rename(ControlBase): path = config.config.path / filename.sanitize(self.path) path.rename(path.with_name(to)) + class Rm(ControlBase): sel: list[str] + def __call__(self): root = config.config.path sel = [root / filename.sanitize(p) for p in self.sel] for p in sel: shutil.rmtree(p, ignore_errors=True) + class Mv(ControlBase): sel: list[str] dst: str + def __call__(self): root = config.config.path sel = [root / filename.sanitize(p) for p in self.sel] @@ -51,9 +60,11 @@ class Mv(ControlBase): for p in sel: shutil.move(p, dst) + class Cp(ControlBase): sel: list[str] dst: str + def __call__(self): root = config.config.path sel = [root / filename.sanitize(p) for p in self.sel] @@ -62,30 +73,41 @@ class Cp(ControlBase): raise BadRequest("The destination must be a directory") for p in sel: # Note: copies as dst rather than in dst unless name is appended. - shutil.copytree(p, dst / p.name, dirs_exist_ok=True, ignore_dangling_symlinks=True) + shutil.copytree( + p, dst / p.name, dirs_exist_ok=True, ignore_dangling_symlinks=True + ) + ## File uploads and downloads + class FileRange(msgspec.Struct): name: str size: int start: int end: int + class StatusMsg(msgspec.Struct): status: str req: FileRange + class ErrorMsg(msgspec.Struct): error: dict[str, Any] + ## Directory listings + class FileEntry(msgspec.Struct): + id: str size: int mtime: int + class DirEntry(msgspec.Struct): + id: str size: int mtime: int dir: DirList @@ -104,30 +126,30 @@ class DirEntry(msgspec.Struct): @property def props(self): - return { - k: v - for k, v in self.__struct_fields__ - if k != "dir" - } + return {k: v for k, v in self.__struct_fields__ if k != "dir"} + DirList = dict[str, FileEntry | DirEntry] class UpdateEntry(msgspec.Struct, omit_defaults=True): """Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories.""" + name: str = "" deleted: bool = False + id: str | None = None size: int | None = None mtime: int | None = None dir: DirList | None = None + def make_dir_data(root): - if len(root) == 2: + if len(root) == 3: return FileEntry(*root) - size, mtime, listing = root + id_, size, mtime, listing = root converted = {} for name, data in listing.items(): converted[name] = make_dir_data(data) sz = sum(x.size for x in converted.values()) mt = max(x.mtime for x in converted.values()) - return DirEntry(sz, max(mt, mtime), converted) + return DirEntry(id_, sz, max(mt, mtime), converted) diff --git a/cista/watching.py b/cista/watching.py index 7e85758..dbdbe78 100755 --- a/cista/watching.py +++ b/cista/watching.py @@ -8,6 +8,7 @@ import inotify.adapters import msgspec from cista import config +from cista.fileio import fuid from cista.protocol import DirEntry, FileEntry, UpdateEntry pubsub = {} @@ -15,9 +16,18 @@ tree = {"": None} tree_lock = threading.Lock() rootpath: Path = None # type: ignore quit = False -modified_flags = "IN_CREATE", "IN_DELETE", "IN_DELETE_SELF", "IN_MODIFY", "IN_MOVE_SELF", "IN_MOVED_FROM", "IN_MOVED_TO" +modified_flags = ( + "IN_CREATE", + "IN_DELETE", + "IN_DELETE_SELF", + "IN_MODIFY", + "IN_MOVE_SELF", + "IN_MOVED_FROM", + "IN_MOVED_TO", +) disk_usage = None + def watcher_thread(loop): global disk_usage @@ -36,7 +46,8 @@ def watcher_thread(loop): refreshdl = time.monotonic() + 60.0 for event in i.event_gen(): - if quit: return + if quit: + return # Disk usage update du = shutil.disk_usage(rootpath) if du != disk_usage: @@ -44,8 +55,10 @@ def watcher_thread(loop): asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop) break # Do a full refresh? - if time.monotonic() > refreshdl: break - if event is None: continue + if time.monotonic() > refreshdl: + break + if event is None: + continue _, flags, path, filename = event if not any(f in modified_flags for f in flags): continue @@ -58,50 +71,72 @@ def watcher_thread(loop): break i = None # Free the inotify object + def format_du(): - return msgspec.json.encode({"space": { - "disk": disk_usage.total, - "used": disk_usage.used, - "free": disk_usage.free, - "storage": tree[""].size, - }}).decode() + return msgspec.json.encode( + { + "space": { + "disk": disk_usage.total, + "used": disk_usage.used, + "free": disk_usage.free, + "storage": tree[""].size, + } + } + ).decode() + def format_tree(): root = tree[""] - return msgspec.json.encode({"update": [ - UpdateEntry(size=root.size, mtime=root.mtime, dir=root.dir) - ]}).decode() + return msgspec.json.encode( + { + "update": [ + UpdateEntry(id=root.id, size=root.size, mtime=root.mtime, dir=root.dir) + ] + } + ).decode() + def walk(path: Path) -> DirEntry | FileEntry | None: try: s = path.stat() + id_ = fuid(s) mtime = int(s.st_mtime) if path.is_file(): - return FileEntry(s.st_size, mtime) + return FileEntry(id_, s.st_size, mtime) - tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None} + tree = { + p.name: v + for p in path.iterdir() + if not p.name.startswith(".") + if (v := walk(p)) is not None + } if tree: size = sum(v.size for v in tree.values()) mtime = max(mtime, max(v.mtime for v in tree.values())) else: size = 0 - return DirEntry(size, mtime, tree) + return DirEntry(id_, size, mtime, tree) except FileNotFoundError: return None except OSError as e: print("OS error walking path", path, e) return None + def update(relpath: PurePosixPath, loop): """Called by inotify updates, check the filesystem and broadcast any changes.""" new = walk(rootpath / relpath) with tree_lock: update = update_internal(relpath, new) - if not update: return # No changes + if not update: + return # No changes msg = msgspec.json.encode({"update": update}).decode() asyncio.run_coroutine_threadsafe(broadcast(msg), loop) -def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> list[UpdateEntry]: + +def update_internal( + relpath: PurePosixPath, new: DirEntry | FileEntry | None +) -> list[UpdateEntry]: path = "", *relpath.parts old = tree elems = [] @@ -142,25 +177,31 @@ def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> u = UpdateEntry(name) if new: parent[name] = new - if u.size != new.size: u.size = new.size - if u.mtime != new.mtime: u.mtime = new.mtime + if u.size != new.size: + u.size = new.size + if u.mtime != new.mtime: + u.mtime = new.mtime if isinstance(new, DirEntry): - if u.dir == new.dir: u.dir = new.dir + if u.dir == new.dir: + u.dir = new.dir else: del parent[name] u.deleted = True update.append(u) return update + async def broadcast(msg): for queue in pubsub.values(): await queue.put_nowait(msg) + async def start(app, loop): config.load_config() app.ctx.watcher = threading.Thread(target=watcher_thread, args=[loop]) app.ctx.watcher.start() + async def stop(app, loop): global quit quit = True