A number of bugfixed on watching, which now works much better.
This commit is contained in:
parent
4de2027959
commit
4c51029c9f
|
@ -110,8 +110,9 @@ function handleUpdateMessage(updateData: { update: UpdateEntry[] }) {
|
||||||
delete node.dir[elem.name]
|
delete node.dir[elem.name]
|
||||||
break // Deleted elements can't have further children
|
break // Deleted elements can't have further children
|
||||||
}
|
}
|
||||||
if (elem.name !== undefined) {
|
if (elem.name) {
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
|
console.log(node, elem.name)
|
||||||
node = node.dir[elem.name] ||= {}
|
node = node.dir[elem.name] ||= {}
|
||||||
}
|
}
|
||||||
if (elem.key !== undefined) node.key = elem.key
|
if (elem.key !== undefined) node.key = elem.key
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
|
from secrets import token_bytes
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
from sanic import Blueprint
|
from sanic import Blueprint
|
||||||
|
@ -100,9 +101,10 @@ async def watch(req, ws):
|
||||||
}
|
}
|
||||||
).decode()
|
).decode()
|
||||||
)
|
)
|
||||||
|
uuid = token_bytes(16)
|
||||||
try:
|
try:
|
||||||
with watching.tree_lock:
|
with watching.tree_lock:
|
||||||
q = watching.pubsub[ws] = asyncio.Queue()
|
q = watching.pubsub[uuid] = asyncio.Queue()
|
||||||
# Init with disk usage and full tree
|
# Init with disk usage and full tree
|
||||||
await ws.send(watching.format_du())
|
await ws.send(watching.format_du())
|
||||||
await ws.send(watching.format_tree())
|
await ws.send(watching.format_tree())
|
||||||
|
@ -110,4 +112,4 @@ async def watch(req, ws):
|
||||||
while True:
|
while True:
|
||||||
await ws.send(await q.get())
|
await ws.send(await q.get())
|
||||||
finally:
|
finally:
|
||||||
del watching.pubsub[ws]
|
del watching.pubsub[uuid]
|
||||||
|
|
|
@ -45,7 +45,7 @@ class Rm(ControlBase):
|
||||||
sel = [root / filename.sanitize(p) for p in self.sel]
|
sel = [root / filename.sanitize(p) for p in self.sel]
|
||||||
for p in sel:
|
for p in sel:
|
||||||
if p.is_dir():
|
if p.is_dir():
|
||||||
shutil.rmtree(p, ignore_errors=True)
|
shutil.rmtree(p)
|
||||||
else:
|
else:
|
||||||
p.unlink()
|
p.unlink()
|
||||||
|
|
||||||
|
@ -147,9 +147,9 @@ DirList = dict[str, FileEntry | DirEntry]
|
||||||
class UpdateEntry(msgspec.Struct, omit_defaults=True):
|
class UpdateEntry(msgspec.Struct, omit_defaults=True):
|
||||||
"""Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories."""
|
"""Updates the named entry in the tree. Fields that are set replace old values. A list of entries recurses directories."""
|
||||||
|
|
||||||
name: str = ""
|
name: str
|
||||||
|
key: str
|
||||||
deleted: bool = False
|
deleted: bool = False
|
||||||
key: str | None = None
|
|
||||||
size: int | None = None
|
size: int | None = None
|
||||||
mtime: int | None = None
|
mtime: int | None = None
|
||||||
dir: DirList | None = None
|
dir: DirList | None = None
|
||||||
|
|
|
@ -6,6 +6,7 @@ from pathlib import Path, PurePosixPath
|
||||||
|
|
||||||
import inotify.adapters
|
import inotify.adapters
|
||||||
import msgspec
|
import msgspec
|
||||||
|
from sanic.log import logging
|
||||||
|
|
||||||
from cista import config
|
from cista import config
|
||||||
from cista.fileio import fuid
|
from cista.fileio import fuid
|
||||||
|
@ -29,7 +30,7 @@ disk_usage = None
|
||||||
|
|
||||||
|
|
||||||
def watcher_thread(loop):
|
def watcher_thread(loop):
|
||||||
global disk_usage
|
global disk_usage, rootpath
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
rootpath = config.config.path
|
rootpath = config.config.path
|
||||||
|
@ -38,6 +39,7 @@ def watcher_thread(loop):
|
||||||
with tree_lock:
|
with tree_lock:
|
||||||
# Initialize the tree from filesystem
|
# Initialize the tree from filesystem
|
||||||
tree[""] = walk(rootpath)
|
tree[""] = walk(rootpath)
|
||||||
|
print(" ".join(tree[""].dir.keys()))
|
||||||
msg = format_tree()
|
msg = format_tree()
|
||||||
if msg != old:
|
if msg != old:
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
||||||
|
@ -67,8 +69,8 @@ def watcher_thread(loop):
|
||||||
try:
|
try:
|
||||||
update(path.relative_to(rootpath), loop)
|
update(path.relative_to(rootpath), loop)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("Watching error", e)
|
print("Watching error", e, path, rootpath)
|
||||||
break
|
raise
|
||||||
i = None # Free the inotify object
|
i = None # Free the inotify object
|
||||||
|
|
||||||
|
|
||||||
|
@ -120,6 +122,8 @@ def walk(path: Path) -> DirEntry | FileEntry | None:
|
||||||
|
|
||||||
def update(relpath: PurePosixPath, loop):
|
def update(relpath: PurePosixPath, loop):
|
||||||
"""Called by inotify updates, check the filesystem and broadcast any changes."""
|
"""Called by inotify updates, check the filesystem and broadcast any changes."""
|
||||||
|
if rootpath is None or relpath is None:
|
||||||
|
print("ERROR", rootpath, relpath)
|
||||||
new = walk(rootpath / relpath)
|
new = walk(rootpath / relpath)
|
||||||
with tree_lock:
|
with tree_lock:
|
||||||
update = update_internal(relpath, new)
|
update = update_internal(relpath, new)
|
||||||
|
@ -160,7 +164,7 @@ def update_internal(
|
||||||
# Update parents
|
# Update parents
|
||||||
update = []
|
update = []
|
||||||
for name, entry in elems[:-1]:
|
for name, entry in elems[:-1]:
|
||||||
u = UpdateEntry(name)
|
u = UpdateEntry(name, entry.key)
|
||||||
if szdiff:
|
if szdiff:
|
||||||
entry.size += szdiff
|
entry.size += szdiff
|
||||||
u.size = entry.size
|
u.size = entry.size
|
||||||
|
@ -170,14 +174,14 @@ def update_internal(
|
||||||
# The last element is the one that changed
|
# The last element is the one that changed
|
||||||
name, entry = elems[-1]
|
name, entry = elems[-1]
|
||||||
parent = elems[-2][1] if len(elems) > 1 else tree
|
parent = elems[-2][1] if len(elems) > 1 else tree
|
||||||
u = UpdateEntry(name)
|
u = UpdateEntry(name, new.key if new else entry.key)
|
||||||
if new:
|
if new:
|
||||||
parent[name] = new
|
parent[name] = new
|
||||||
if u.size != new.size:
|
if u.size != new.size:
|
||||||
u.size = new.size
|
u.size = new.size
|
||||||
if u.mtime != new.mtime:
|
if u.mtime != new.mtime:
|
||||||
u.mtime = new.mtime
|
u.mtime = new.mtime
|
||||||
if isinstance(new, DirEntry) and u.dir == new.dir:
|
if isinstance(new, DirEntry) and u.dir != new.dir:
|
||||||
u.dir = new.dir
|
u.dir = new.dir
|
||||||
else:
|
else:
|
||||||
del parent[name]
|
del parent[name]
|
||||||
|
@ -187,8 +191,12 @@ def update_internal(
|
||||||
|
|
||||||
|
|
||||||
async def broadcast(msg):
|
async def broadcast(msg):
|
||||||
for queue in pubsub.values():
|
try:
|
||||||
await queue.put_nowait(msg)
|
for queue in pubsub.values():
|
||||||
|
queue.put_nowait(msg)
|
||||||
|
except Exception:
|
||||||
|
# Log because asyncio would silently eat the error
|
||||||
|
logging.exception("Broadcast error")
|
||||||
|
|
||||||
|
|
||||||
async def start(app, loop):
|
async def start(app, loop):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user