From 7297eeba4b0b7503ead848b772ace16c3094773f Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Sat, 14 Oct 2023 06:07:27 +0300 Subject: [PATCH] Initial commit --- server/app.py | 156 ++++++++++++++++++++++++++++++++++++++++++++ server/asynclink.py | 93 ++++++++++++++++++++++++++ server/fileio.py | 83 +++++++++++++++++++++++ server/lrucache.py | 29 ++++++++ 4 files changed, 361 insertions(+) create mode 100644 server/app.py create mode 100644 server/asynclink.py create mode 100644 server/fileio.py create mode 100644 server/lrucache.py diff --git a/server/app.py b/server/app.py new file mode 100644 index 0000000..06c5011 --- /dev/null +++ b/server/app.py @@ -0,0 +1,156 @@ +from tarfile import DEFAULT_FORMAT +from sanic import Sanic +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from pathlib import Path, PurePosixPath +from hashlib import sha256 +import secrets +import json +import unicodedata +import asyncio +from pathvalidate import sanitize_filepath +import os + +ROOT = Path(os.environ.get("STORAGE", Path.cwd())) +secret = secrets.token_bytes(8) + +def fuid(stat): + return sha256((stat.st_dev << 32 | stat.st_ino).to_bytes(8, 'big') + secret).hexdigest()[:16] + +def walk(path: Path = ROOT): + try: + s = path.stat() + mtime = int(s.st_mtime) + if path.is_file(): + return s.st_size, mtime + + tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None} + if tree: + size = sum(v[0] for v in tree.values()) + mtime = max(v[1] for v in tree.values()) + else: + size = 0 + return size, mtime, tree + except OSError as e: + return None + +tree = walk() + +def update(relpath: PurePosixPath): + ptr = tree[2] + path = ROOT + name = "" + for name in relpath.parts[:-1]: + path /= name + try: + ptr = ptr[name][2] + except KeyError: + break + new = walk(path) + old = ptr.pop(name, None) + if new is not None: + ptr[name] = new + if old == new: + return + print("Update", relpath, new) + # TODO: update parents size/mtime + msg = json.dumps({"update": { + "path": relpath.as_posix(), + "data": new, + }}) + for queue in watchers.values(): queue.put_nowait(msg) + +app = Sanic(__name__) + +@app.before_server_start +async def start_watcher(app, _): + class Handler(FileSystemEventHandler): + def on_any_event(self, event): + update(Path(event.src_path).relative_to(ROOT)) + app.ctx.observer = Observer() + app.ctx.observer.schedule(Handler(), str(ROOT), recursive=True) + app.ctx.observer.start() + +@app.after_server_stop +async def stop_watcher(app, _): + app.ctx.observer.stop() + app.ctx.observer.join() + +app.static('/', "index.html", name="indexhtml") +app.static("/files", ROOT, use_content_range=True, stream_large_files=True, directory_view=True) + +watchers = {} + +@app.websocket('/api/watch') +async def watch(request, ws): + try: + q = watchers[ws] = asyncio.Queue() + await ws.send(json.dumps({"root": tree})) + while True: + await ws.send(await q.get()) + finally: + del watchers[ws] + +@app.websocket('/api/upload') +async def upload(request, ws): + file = None + filename = None + left = 0 + msg = {} + try: + async for data in ws: + if isinstance(data, bytes): + if not file: + print(f"No file open, received {len(data)} bytes") + break + if len(data) > left: + msg["error"] = "Too much data" + ws.send(json.dumps(msg)) + return + left -= len(data) + file.write(data) + if left == 0: + msg["written"] = end - start + await ws.send(json.dumps(msg)) + msg = {} + continue + + msg = json.loads(data) + name = str(msg['name']) + size = int(msg['size']) + start = int(msg['start']) + end = int(msg['end']) + if not 0 <= start < end <= size: + msg["error"] = "Invalid range" + ws.send(json.dumps(msg)) + return + left = end - start + if filename != name: + if file: + file.close() + file, filename = None, None + file = openfile(name) + file.truncate(size) + filename = name + file.seek(start) + + finally: + if file: + file.close() + + +def openfile(name): + # Name sanitation & security + name = unicodedata.normalize("NFC", name).replace("\\", "") + name = sanitize_filepath(name) + p = PurePosixPath(name) + if p.is_absolute() or any(n.startswith(".") for n in p.parts): + raise ValueError("Invalid filename") + # Create/open file + path = ROOT / p + path.parent.mkdir(parents=True, exist_ok=True) + try: + file = path.open("xb+") # create new file + except FileExistsError: + file = path.open("rb+") # write to existing file (along with other workers) + return file diff --git a/server/asynclink.py b/server/asynclink.py new file mode 100644 index 0000000..40dfd5c --- /dev/null +++ b/server/asynclink.py @@ -0,0 +1,93 @@ +import asyncio +from contextlib import suppress + +class AsyncLink: + """Facilitate two-way connection between asyncio and a worker thread.""" + + def __init__(self): + """Initialize; must be called from async context.""" + self.loop = asyncio.get_running_loop() + self.queue = asyncio.Queue(maxsize=1) + + async def __call__(self, command) -> asyncio.Future: + """Run command in worker thread; awaitable. + + Args: + command: Command to run in worker thread. + """ + fut = self.loop.create_future() + await self.queue.put((command, fut)) + return await fut + + @property + def to_sync(self): + """Yield SyncRequests from async caller when called from worker thread.""" + while (req := self._await(self._get())) is not None: + yield SyncRequest(self, req) + + async def _get(self): + """Retrieve an item from the queue; handle cancellation.""" + with suppress(asyncio.CancelledError): + ret = await self.queue.get() + self.queue.task_done() + return ret + + def _await(self, coro): + """Run coroutine in main thread and return result; called from worker.""" + return asyncio.run_coroutine_threadsafe(coro, self.loop).result() + + async def stop(self): + """Stop worker and clean up.""" + while not self.queue.empty(): + command, future = self.queue.get_nowait() + if not future.done(): + future.set_exception(Exception("AsyncLink stopped")) + self.queue.task_done() + await self.queue.put(None) + + +async def set_result(fut: asyncio.Future, value=None, exception=None): + """Set result or exception on an asyncio.Future object. + + Args: + fut (asyncio.Future): Future to set result or exception on. + value: Result to set on the future. + exception: Exception to set on the future. + """ + with suppress(asyncio.InvalidStateError): + if exception is None: + fut.set_result(value) + else: + fut.set_exception(exception) + + +class SyncRequest: + """Handle values from sync thread in main asyncio event loop.""" + + def __init__(self, alink: AsyncLink, req): + """Initialize SyncRequest with AsyncLink and request.""" + self.alink = alink + self.command, self.future = req + self.done = False + + def __enter__(self): + """Provide command to with-block and handle exceptions.""" + return self.command + + def __exit__(self, exc_type, exc, traceback): + """Set result or exception on exit; suppress exceptions in with-block.""" + if exc: + self.set_exception(exc) + return True + elif not self.done: + self.set_result(None) + + def set_result(self, value): + """Set result value; mark as done.""" + self.done = True + self.alink._await(set_result(self.future, value)) + + def set_exception(self, exc): + """Set exception; mark as done.""" + self.done = True + self.alink._await(set_result(self.future, exception=exc)) diff --git a/server/fileio.py b/server/fileio.py new file mode 100644 index 0000000..0420d65 --- /dev/null +++ b/server/fileio.py @@ -0,0 +1,83 @@ +import asyncio +import os + +from asynclink import AsyncLink +from lrucache import LRUCache + +class File: + def __init__(self, filename): + self.filename = filename + self.fd = None + self.writable = False + + def open_ro(self): + self.close() + self.fd = os.open(self.filename, os.O_RDONLY) + + def open_rw(self): + self.close() + self.fd = os.open(self.filename, os.O_RDWR | os.O_CREAT) + self.writable = True + + def write(self, pos, buffer, *, file_size=None): + if not self.writable: + self.open_rw() + if file_size is not None: + os.ftruncate(self.fd, file_size) + os.lseek(self.fd, pos, os.SEEK_SET) + os.write(self.fd, buffer) + + def __getitem__(self, slice): + if self.fd is None: + self.open_ro() + os.lseek(self.fd, slice.start, os.SEEK_SET) + l = slice.stop - slice.start + data = os.read(self.fd, l) + if len(data) < l: raise EOFError("Error reading requested range") + return data + + def close(self): + if self.fd is not None: + os.close(self.fd) + self.fd = self.writable = None + + def __del__(self): + self.close() + + +class FileServer: + + async def start(self): + self.alink = AsyncLink() + self.worker = asyncio.get_event_loop().run_in_executor(None, self.worker_thread, alink.to_sync) + + async def stop(self): + await self.alink.stop() + await self.worker + + def worker_thread(self, slink): + cache = LRUCache(File, capacity=10, maxage=5.0) + for req in slink: + with req as (command, msg, data): + if command == "upload": + req.set_result(self.upload(msg, data)) + else: + raise NotImplementedError(f"Unhandled {command=} {msg}") + + + def upload(self, msg, data): + name = str(msg['name']) + size = int(msg['size']) + start = int(msg['start']) + end = int(msg['end']) + + if not 0 <= start < end <= size: + raise OverflowError("Invalid range") + if end - start > 1<<20: + raise OverflowError("Too much data, max 1 MiB.") + if end - start != len(data): + raise ValueError("Data length does not match range") + f = self.cache[name] + f.write(start, data, file_size=size) + + return {"written": len(data), **msg} diff --git a/server/lrucache.py b/server/lrucache.py new file mode 100644 index 0000000..1e1fadf --- /dev/null +++ b/server/lrucache.py @@ -0,0 +1,29 @@ +from time import monotonic + +class LRUCache: + def __init__(self, open: callable, *, capacity: int, maxage: float): + self.open = open + self.capacity = capacity + self.maxage = maxage + self.cache = [] # Each item is a tuple: (key, handle, timestamp), recent items first + + def __contains__(self, key): + return any(rec[0] == key for rec in self.cache) + + def __getitem__(self, key): + # Take from cache or open a new one + for i, (k, f, ts) in enumerate(self.cache): + if k == key: + self.cache.pop(i) + break + else: + f = self.open(key) + # Add/restore to end of cache + self.cache.append((key, f, monotonic())) + self.expire_items() + return f + + def expire_items(self): + ts = monotonic() - self.maxage + while len(self.cache) > self.capacity or self.cache and self.cache[-1][2] < ts: + self.cache.pop()[1].close()