Watcher rewritten with inotify module, bugs fixed.

This commit is contained in:
Leo Vasanko 2023-10-19 23:52:37 +03:00 committed by Leo Vasanko
parent 228b75a20d
commit 9939cb33fa
4 changed files with 45 additions and 21 deletions

View File

@ -52,7 +52,7 @@ def _main():
listen = args["-l"] listen = args["-l"]
# Validate arguments first # Validate arguments first
if args["<path>"]: if args["<path>"]:
path = Path(args["<path>"]) path = Path(args["<path>"]).resolve()
if not path.is_dir(): if not path.is_dir():
raise ValueError(f"No such directory: {path}") raise ValueError(f"No such directory: {path}")
else: else:

View File

@ -54,9 +54,11 @@ function createWatchSocket() {
createWatchSocket() createWatchSocket()
function tree_update(msg) { function tree_update(msg) {
console.log("Tree update", msg)
let node = files let node = files
for (const elem of msg) { for (const elem of msg) {
if (elem.deleted) { if (elem.deleted) {
const p = node.dir[elem.name].path
delete node.dir[elem.name] delete node.dir[elem.name]
delete flatfiles[p] delete flatfiles[p]
break break

View File

@ -1,16 +1,47 @@
import asyncio import asyncio
import secrets
import threading import threading
import time
from pathlib import Path, PurePosixPath from pathlib import Path, PurePosixPath
from socket import timeout
import inotify.adapters
import msgspec import msgspec
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from . import config from . import config
from .protocol import DirEntry, FileEntry, UpdateEntry from .protocol import DirEntry, FileEntry, UpdateEntry
pubsub = {} pubsub = {}
tree = {"": None}
tree_lock = threading.Lock()
rootpath = None
quit = False
modified_flags = "IN_CREATE", "IN_DELETE", "IN_DELETE_SELF", "IN_MODIFY", "IN_MOVE_SELF", "IN_MOVED_FROM", "IN_MOVED_TO"
def watcher_thread(loop):
while True:
i = inotify.adapters.InotifyTree(rootpath.as_posix())
old = refresh() if tree[""] else None
with tree_lock:
# Initialize the tree from filesystem
tree[""] = walk(rootpath)
msg = refresh()
if msg != old:
asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
# The watching is not entirely reliable, so do a full refresh every minute
refreshdl = time.monotonic() + 60.0
for event in i.event_gen():
if quit: return
if time.monotonic() > refreshdl: break
if event is None: continue
_, flags, path, filename = event
if not any(f in modified_flags for f in flags):
continue
path = PurePosixPath(path) / filename
#print(path, flags)
update(path.relative_to(rootpath), loop)
i = None # Free the inotify object
def walk(path: Path) -> DirEntry | FileEntry | None: def walk(path: Path) -> DirEntry | FileEntry | None:
try: try:
@ -32,10 +63,6 @@ def walk(path: Path) -> DirEntry | FileEntry | None:
print("OS error walking path", path, e) print("OS error walking path", path, e)
return None return None
tree = {"": None}
tree_lock = threading.Lock()
rootpath = None
def refresh(): def refresh():
root = tree[""] root = tree[""]
return msgspec.json.encode({"update": [ return msgspec.json.encode({"update": [
@ -49,7 +76,6 @@ def update(relpath: Path, loop):
update = update_internal(relpath, new) update = update_internal(relpath, new)
if not update: return # No changes if not update: return # No changes
msg = msgspec.json.encode({"update": update}).decode() msg = msgspec.json.encode({"update": update}).decode()
print(msg)
asyncio.run_coroutine_threadsafe(broadcast(msg), loop) asyncio.run_coroutine_threadsafe(broadcast(msg), loop)
def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> list[UpdateEntry]: def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) -> list[UpdateEntry]:
@ -88,7 +114,6 @@ def update_internal(relpath: PurePosixPath, new: DirEntry | FileEntry | None) ->
u.mtime = entry.mtime = mt u.mtime = entry.mtime = mt
update.append(u) update.append(u)
# The last element is the one that changed # The last element is the one that changed
print([e[0] for e in elems])
name, entry = elems[-1] name, entry = elems[-1]
parent = elems[-2][1] if len(elems) > 1 else tree parent = elems[-2][1] if len(elems) > 1 else tree
u = UpdateEntry(name) u = UpdateEntry(name)
@ -113,27 +138,24 @@ def register(app, url):
async def start_watcher(app, loop): async def start_watcher(app, loop):
global rootpath global rootpath
config.load_config() config.load_config()
# Initialize the tree from filesystem
rootpath = config.config.path rootpath = config.config.path
tree[""] = walk(rootpath) app.ctx.watcher = threading.Thread(target=watcher_thread, args=[loop])
class Handler(FileSystemEventHandler): app.ctx.watcher.start()
def on_any_event(self, event):
update(Path(event.src_path).relative_to(rootpath), loop)
app.ctx.observer = Observer()
app.ctx.observer.schedule(Handler(), str(rootpath), recursive=True)
app.ctx.observer.start()
@app.after_server_stop @app.after_server_stop
async def stop_watcher(app, _): async def stop_watcher(app, _):
app.ctx.observer.stop() global quit
app.ctx.observer.join() quit = True
app.ctx.watcher.join()
@app.websocket(url) @app.websocket(url)
async def watch(request, ws): async def watch(request, ws):
try: try:
with tree_lock: with tree_lock:
q = pubsub[ws] = asyncio.Queue() q = pubsub[ws] = asyncio.Queue()
# Init with full tree
await ws.send(refresh()) await ws.send(refresh())
# Send updates
while True: while True:
await ws.send(await q.get()) await ws.send(await q.get())
finally: finally:

View File

@ -16,12 +16,12 @@ classifiers = [
dependencies = [ dependencies = [
"argon2-cffi", "argon2-cffi",
"docopt", "docopt",
"inotify",
"msgspec", "msgspec",
"pathvalidate", "pathvalidate",
"pyjwt", "pyjwt",
"sanic", "sanic",
"tomli_w", "tomli_w",
"watchdog",
] ]
[project.urls] [project.urls]