209 lines
5.7 KiB
Python
Executable File
209 lines
5.7 KiB
Python
Executable File
import asyncio
|
|
import shutil
|
|
import threading
|
|
import time
|
|
from pathlib import Path, PurePosixPath
|
|
|
|
import inotify.adapters
|
|
import msgspec
|
|
|
|
from cista import config
|
|
from cista.fileio import fuid
|
|
from cista.protocol import DirEntry, FileEntry, UpdateEntry
|
|
|
|
pubsub = {}
|
|
tree = {"": None}
|
|
tree_lock = threading.Lock()
|
|
rootpath: Path = None # type: ignore
|
|
quit = False
|
|
modified_flags = (
|
|
"IN_CREATE",
|
|
"IN_DELETE",
|
|
"IN_DELETE_SELF",
|
|
"IN_MODIFY",
|
|
"IN_MOVE_SELF",
|
|
"IN_MOVED_FROM",
|
|
"IN_MOVED_TO",
|
|
)
|
|
disk_usage = None
|
|
|
|
|
|
def watcher_thread(loop):
|
|
global disk_usage
|
|
|
|
while True:
|
|
rootpath = config.config.path
|
|
i = inotify.adapters.InotifyTree(rootpath.as_posix())
|
|
old = format_tree() if tree[""] else None
|
|
with tree_lock:
|
|
# Initialize the tree from filesystem
|
|
tree[""] = walk(rootpath)
|
|
msg = format_tree()
|
|
if msg != old:
|
|
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
|
|
|
# The watching is not entirely reliable, so do a full refresh every minute
|
|
refreshdl = time.monotonic() + 60.0
|
|
|
|
for event in i.event_gen():
|
|
if quit:
|
|
return
|
|
# Disk usage update
|
|
du = shutil.disk_usage(rootpath)
|
|
if du != disk_usage:
|
|
disk_usage = du
|
|
asyncio.run_coroutine_threadsafe(broadcast(format_du()), loop)
|
|
break
|
|
# Do a full refresh?
|
|
if time.monotonic() > refreshdl:
|
|
break
|
|
if event is None:
|
|
continue
|
|
_, flags, path, filename = event
|
|
if not any(f in modified_flags for f in flags):
|
|
continue
|
|
# Update modified path
|
|
path = PurePosixPath(path) / filename
|
|
try:
|
|
update(path.relative_to(rootpath), loop)
|
|
except Exception as e:
|
|
print("Watching error", e)
|
|
break
|
|
i = None # Free the inotify object
|
|
|
|
|
|
def format_du():
|
|
return msgspec.json.encode(
|
|
{
|
|
"space": {
|
|
"disk": disk_usage.total,
|
|
"used": disk_usage.used,
|
|
"free": disk_usage.free,
|
|
"storage": tree[""].size,
|
|
}
|
|
}
|
|
).decode()
|
|
|
|
|
|
def format_tree():
|
|
root = tree[""]
|
|
return msgspec.json.encode(
|
|
{
|
|
"update": [
|
|
UpdateEntry(id=root.id, size=root.size, mtime=root.mtime, dir=root.dir)
|
|
]
|
|
}
|
|
).decode()
|
|
|
|
|
|
def walk(path: Path) -> DirEntry | FileEntry | None:
|
|
try:
|
|
s = path.stat()
|
|
id_ = fuid(s)
|
|
mtime = int(s.st_mtime)
|
|
if path.is_file():
|
|
return FileEntry(id_, s.st_size, mtime)
|
|
|
|
tree = {
|
|
p.name: v
|
|
for p in path.iterdir()
|
|
if not p.name.startswith(".")
|
|
if (v := walk(p)) is not None
|
|
}
|
|
if tree:
|
|
size = sum(v.size for v in tree.values())
|
|
mtime = max(mtime, max(v.mtime for v in tree.values()))
|
|
else:
|
|
size = 0
|
|
return DirEntry(id_, size, mtime, tree)
|
|
except FileNotFoundError:
|
|
return None
|
|
except OSError as e:
|
|
print("OS error walking path", path, e)
|
|
return None
|
|
|
|
|
|
def update(relpath: PurePosixPath, loop):
|
|
"""Called by inotify updates, check the filesystem and broadcast any changes."""
|
|
new = walk(rootpath / relpath)
|
|
with tree_lock:
|
|
update = update_internal(relpath, new)
|
|
if not update:
|
|
return # No changes
|
|
msg = msgspec.json.encode({"update": update}).decode()
|
|
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
|
|
|
|
|
|
def update_internal(
|
|
relpath: PurePosixPath, new: DirEntry | FileEntry | None
|
|
) -> list[UpdateEntry]:
|
|
path = "", *relpath.parts
|
|
old = tree
|
|
elems = []
|
|
for name in path:
|
|
if name not in old:
|
|
# File or folder created
|
|
old = None
|
|
elems.append((name, None))
|
|
if len(elems) < len(path):
|
|
# We got a notify for an item whose parent is not in tree
|
|
print("Tree out of sync DEBUG", relpath)
|
|
print(elems)
|
|
print("Current tree:")
|
|
print(tree[""])
|
|
print("Walking all:")
|
|
print(walk(rootpath))
|
|
raise ValueError("Tree out of sync")
|
|
break
|
|
old = old[name]
|
|
elems.append((name, old))
|
|
if old == new:
|
|
return []
|
|
mt = new.mtime if new else 0
|
|
szdiff = (new.size if new else 0) - (old.size if old else 0)
|
|
# Update parents
|
|
update = []
|
|
for name, entry in elems[:-1]:
|
|
u = UpdateEntry(name)
|
|
if szdiff:
|
|
entry.size += szdiff
|
|
u.size = entry.size
|
|
if mt > entry.mtime:
|
|
u.mtime = entry.mtime = mt
|
|
update.append(u)
|
|
# The last element is the one that changed
|
|
name, entry = elems[-1]
|
|
parent = elems[-2][1] if len(elems) > 1 else tree
|
|
u = UpdateEntry(name)
|
|
if new:
|
|
parent[name] = new
|
|
if u.size != new.size:
|
|
u.size = new.size
|
|
if u.mtime != new.mtime:
|
|
u.mtime = new.mtime
|
|
if isinstance(new, DirEntry):
|
|
if u.dir == new.dir:
|
|
u.dir = new.dir
|
|
else:
|
|
del parent[name]
|
|
u.deleted = True
|
|
update.append(u)
|
|
return update
|
|
|
|
|
|
async def broadcast(msg):
|
|
for queue in pubsub.values():
|
|
await queue.put_nowait(msg)
|
|
|
|
|
|
async def start(app, loop):
|
|
config.load_config()
|
|
app.ctx.watcher = threading.Thread(target=watcher_thread, args=[loop])
|
|
app.ctx.watcher.start()
|
|
|
|
|
|
async def stop(app, loop):
|
|
global quit
|
|
quit = True
|
|
app.ctx.watcher.join()
|