diff --git a/cista/api.py b/cista/api.py index 55c760c..c97d893 100644 --- a/cista/api.py +++ b/cista/api.py @@ -107,8 +107,8 @@ async def watch(req, ws): with watching.state.lock: q = watching.pubsub[uuid] = asyncio.Queue() # Init with disk usage and full tree - await ws.send(watching.format_du(watching.state.space)) - await ws.send(watching.format_tree(watching.state.root)) + await ws.send(watching.format_space(watching.state.space)) + await ws.send(watching.format_root(watching.state.root)) # Send updates while True: await ws.send(await q.get()) diff --git a/cista/watching.py b/cista/watching.py index aa704e5..49c4f29 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -86,7 +86,6 @@ class State: def __getitem__(self, index: PurePosixPath | tuple[PurePosixPath, int]): with self.lock: - print(self._slice(index)) return self._listing[self._slice(index)] def __setitem__( @@ -153,8 +152,7 @@ def watcher_thread(loop): if old != new: with state.lock: state.root = new - msg = format_tree(new) - asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result() + broadcast(format_root(new), loop) # The watching is not entirely reliable, so do a full refresh every minute refreshdl = time.monotonic() + 60.0 @@ -167,9 +165,7 @@ def watcher_thread(loop): space = Space(*du, storage=state.root[0].size) if space != state.space: state.space = space - asyncio.run_coroutine_threadsafe( - broadcast(format_du(space)), loop - ).result() + broadcast(format_space(space), loop) break # Do a full refresh? if time.monotonic() > refreshdl: @@ -197,27 +193,20 @@ def watcher_thread_poll(loop): old = state.root new = walk() if old != new: - state.root = new - asyncio.run_coroutine_threadsafe(broadcast(format_tree(new)), loop).result() + with state.lock: + state.root = new + broadcast(format_update(old, new), loop) # Disk usage update du = shutil.disk_usage(rootpath) space = Space(*du, storage=state.root[0].size) if space != state.space: state.space = space - asyncio.run_coroutine_threadsafe(broadcast(format_du(space)), loop).result() + broadcast(format_space(space), loop) time.sleep(2.0) -def format_du(usage): - return msgspec.json.encode({"space": usage}).decode() - - -def format_tree(root): - return msgspec.json.encode({"root": root}).decode() - - def walk(rel=PurePosixPath()) -> list[FileEntry]: # noqa: B008 path = rootpath / rel try: @@ -265,17 +254,16 @@ def update(relpath: PurePosixPath, loop): if rootpath is None or relpath is None: print("ERROR", rootpath, relpath) new = walk(relpath) - old = state[relpath] - if old == new: - return - old = state.root - if new: - state[relpath, new[0].isfile] = new - else: - del state[relpath] - # FIXME: broadcast format_update() - msg = format_update(old, state.root) - asyncio.run_coroutine_threadsafe(broadcast(msg), loop).result() + with state.lock: + old = state[relpath] + if old == new: + return + old = state.root + if new: + state[relpath, new[0].isfile] = new + else: + del state[relpath] + broadcast(format_update(old, state.root), loop) def format_update(old, new): @@ -322,7 +310,19 @@ def format_update(old, new): return msgspec.json.encode({"update": update}).decode() -async def broadcast(msg): +def format_space(usage): + return msgspec.json.encode({"space": usage}).decode() + + +def format_root(root): + return msgspec.json.encode({"root": root}).decode() + + +def broadcast(msg, loop): + return asyncio.run_coroutine_threadsafe(abroadcast(msg), loop).result() + + +async def abroadcast(msg): try: for queue in pubsub.values(): queue.put_nowait(msg)