From f697d96c89c85d5552276124965669e95c0cfbe8 Mon Sep 17 00:00:00 2001 From: Leo Vasanko Date: Sun, 15 Oct 2023 01:29:50 +0300 Subject: [PATCH] Restructuring as a Python package. --- .gitignore | 1 + README.md | 0 cista/__init__.py | 1 + cista/app.py | 122 ++++++++++++++++++++++++++ {server => cista}/asynclink.py | 1 + cista/fileio.py | 95 ++++++++++++++++++++ {server => cista}/lrucache.py | 5 ++ cista/protocol.py | 46 ++++++++++ cista/static/index.html | 154 ++++++++++++++++++++++++++++++++ cista/watching.py | 58 ++++++++++++ server/app.py | 156 --------------------------------- server/fileio.py | 83 ------------------ setup.py | 26 ++++++ 13 files changed, 509 insertions(+), 239 deletions(-) create mode 100644 README.md create mode 100644 cista/__init__.py create mode 100644 cista/app.py rename {server => cista}/asynclink.py (99%) create mode 100644 cista/fileio.py rename {server => cista}/lrucache.py (92%) create mode 100644 cista/protocol.py create mode 100644 cista/static/index.html create mode 100644 cista/watching.py delete mode 100644 server/app.py delete mode 100644 server/fileio.py create mode 100644 setup.py diff --git a/.gitignore b/.gitignore index 6ebe0b9..f6120d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .* !.gitignore __pycache__/ +*.egg-info/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/cista/__init__.py b/cista/__init__.py new file mode 100644 index 0000000..c07c459 --- /dev/null +++ b/cista/__init__.py @@ -0,0 +1 @@ +from .app import app diff --git a/cista/app.py b/cista/app.py new file mode 100644 index 0000000..36ed48b --- /dev/null +++ b/cista/app.py @@ -0,0 +1,122 @@ +import asyncio +import logging +from pathlib import Path +from tarfile import DEFAULT_FORMAT +from typing import ParamSpecKwargs + +import msgspec +from importlib.resources import files +from sanic import Sanic +from sanic.response import html +from sanic.log import logger +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + +from . import watching +from .fileio import ROOT, FileServer +from .protocol import ErrorMsg, FileRange, StatusMsg + +app = Sanic("cista") +fileserver = FileServer() + +def asend(ws, msg): + return ws.send(msg if isinstance(msg, bytes) else msgspec.json.encode(msg).decode()) + +@app.before_server_start +async def start_fileserver(app, _): + await fileserver.start() + +@app.after_server_stop +async def stop_fileserver(app, _): + await fileserver.stop() + +@app.before_server_start +async def start_watcher(app, _): + class Handler(FileSystemEventHandler): + def on_any_event(self, event): + watching.update(Path(event.src_path).relative_to(ROOT)) + app.ctx.observer = Observer() + app.ctx.observer.schedule(Handler(), str(ROOT), recursive=True) + app.ctx.observer.start() + +@app.after_server_stop +async def stop_watcher(app, _): + app.ctx.observer.stop() + app.ctx.observer.join() + + + +@app.get("/") +async def index_page(request): + index = files("cista").joinpath("static", "index.html").read_text() + return html(index) + +app.static("/files", ROOT, use_content_range=True, stream_large_files=True, directory_view=True) + +@app.websocket('/api/watch') +async def watch(request, ws): + try: + q = watching.pubsub[ws] = asyncio.Queue() + await asend(ws, {"root": watching.tree}) + while True: + await asend(ws, await q.get()) + finally: + del watching.pubsub[ws] + +@app.websocket('/api/upload') +async def upload(request, ws): + alink = fileserver.alink + url = request.url_for("upload") + while True: + req = None + try: + text = await ws.recv() + if not isinstance(text, str): + raise ValueError(f"Expected JSON control, got binary len(data) = {len(text)}") + req = msgspec.json.decode(text, type=FileRange) + pos = req.start + while pos < req.end and (data := await ws.recv()) and isinstance(data, bytes): + pos += await alink(("upload", req.name, pos, data, req.size)) + if pos != req.end: + d = f"{len(data)} bytes" if isinstance(data, bytes) else data + raise ValueError(f"Expected {req.end - pos} more bytes, got {d}") + # Report success + res = StatusMsg(status="upload", url=url, req=req) + await asend(ws, res) + print(res) + + except Exception as e: + res = ErrorMsg(error=str(e), url=url, req=req) + await asend(ws, res) + logger.exception(repr(res), e) + return + + +@app.websocket('/api/download') +async def download(request, ws): + alink = fileserver.alink + url = request.url_for("download") + while True: + req = None + try: + text = await ws.recv() + if not isinstance(text, str): + raise ValueError(f"Expected JSON control, got binary len(data) = {len(text)}") + req = msgspec.json.decode(text, type=FileRange) + print("download", req) + pos = req.start + while pos < req.end: + end = min(req.end, pos + (1<<20)) + data = await alink(("download", req.name, pos, end)) + await asend(ws, data) + pos += len(data) + # Report success + res = StatusMsg(status="download", url=url, req=req) + await asend(ws, res) + print(res) + + except Exception as e: + res = ErrorMsg(error=str(e), url=url, req=req) + await asend(ws, res) + logger.exception(repr(res), e) + return diff --git a/server/asynclink.py b/cista/asynclink.py similarity index 99% rename from server/asynclink.py rename to cista/asynclink.py index 40dfd5c..85b98cc 100644 --- a/server/asynclink.py +++ b/cista/asynclink.py @@ -1,6 +1,7 @@ import asyncio from contextlib import suppress + class AsyncLink: """Facilitate two-way connection between asyncio and a worker thread.""" diff --git a/cista/fileio.py b/cista/fileio.py new file mode 100644 index 0000000..a369bc4 --- /dev/null +++ b/cista/fileio.py @@ -0,0 +1,95 @@ +import asyncio +import os +import unicodedata +from pathlib import Path + +from pathvalidate import sanitize_filepath + +from .asynclink import AsyncLink +from .lrucache import LRUCache + +ROOT = Path(os.environ.get("STORAGE", Path.cwd())) + +def sanitize_filename(filename): + filename = unicodedata.normalize("NFC", filename) + filename = sanitize_filepath(filename) + filename = filename.replace("/", "-") + return filename + +class File: + def __init__(self, filename): + self.path = ROOT / filename + self.fd = None + self.writable = False + + def open_ro(self): + self.close() + self.fd = os.open(self.path, os.O_RDONLY) + + def open_rw(self): + self.close() + self.path.parent.mkdir(parents=True, exist_ok=True) + self.fd = os.open(self.path, os.O_RDWR | os.O_CREAT) + self.writable = True + + def write(self, pos, buffer, *, file_size=None): + if not self.writable: + # Create/open file + self.open_rw() + if file_size is not None: + os.ftruncate(self.fd, file_size) + os.lseek(self.fd, pos, os.SEEK_SET) + os.write(self.fd, buffer) + + def __getitem__(self, slice): + if self.fd is None: + self.open_ro() + os.lseek(self.fd, slice.start, os.SEEK_SET) + l = slice.stop - slice.start + data = os.read(self.fd, l) + if len(data) < l: raise EOFError("Error reading requested range") + return data + + def close(self): + if self.fd is not None: + os.close(self.fd) + self.fd = self.writable = None + + def __del__(self): + self.close() + + +class FileServer: + + async def start(self): + self.alink = AsyncLink() + self.worker = asyncio.get_event_loop().run_in_executor(None, self.worker_thread, self.alink.to_sync) + self.cache = LRUCache(File, capacity=10, maxage=5.0) + + async def stop(self): + await self.alink.stop() + await self.worker + + def worker_thread(self, slink): + try: + for req in slink: + with req as (command, *args): + if command == "upload": + req.set_result(self.upload(*args)) + elif command == "download": + req.set_result(self.download(*args)) + else: + raise NotImplementedError(f"Unhandled {command=} {args}") + finally: + self.cache.close() + + def upload(self, name, pos, data, file_size): + name = sanitize_filename(name) + f = self.cache[name] + f.write(pos, data, file_size=file_size) + return len(data) + + def download(self, name, start, end): + name = sanitize_filename(name) + f = self.cache[name] + return f[start: end] diff --git a/server/lrucache.py b/cista/lrucache.py similarity index 92% rename from server/lrucache.py rename to cista/lrucache.py index 1e1fadf..18ebe66 100644 --- a/server/lrucache.py +++ b/cista/lrucache.py @@ -1,5 +1,6 @@ from time import monotonic + class LRUCache: def __init__(self, open: callable, *, capacity: int, maxage: float): self.open = open @@ -27,3 +28,7 @@ class LRUCache: ts = monotonic() - self.maxage while len(self.cache) > self.capacity or self.cache and self.cache[-1][2] < ts: self.cache.pop()[1].close() + + def close(self): + self.capacity = 0 + self.expire_items() diff --git a/cista/protocol.py b/cista/protocol.py new file mode 100644 index 0000000..8d1b8ed --- /dev/null +++ b/cista/protocol.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from typing import Dict, Tuple, Union + +import msgspec + +## File uploads and downloads + +class FileRange(msgspec.Struct): + name: str + size: int + start: int + end: int + +class ErrorMsg(msgspec.Struct): + error: str + req: FileRange + url: str + +class StatusMsg(msgspec.Struct): + status: str + url: str + req: FileRange + + +## Directory listings + +class FileEntry(msgspec.Struct): + size: int + mtime: int + +class DirEntry(msgspec.Struct): + size: int + mtime: int + dir: Dict[str, Union[FileEntry, DirEntry]] + +def make_dir_data(root): + if len(root) == 2: + return FileEntry(*root) + size, mtime, listing = root + converted = {} + for name, data in listing.items(): + converted[name] = make_dir_data(data) + sz = sum(x.size for x in converted.values()) + mt = max(x.mtime for x in converted.values()) + return DirEntry(sz, max(mt, mtime), converted) diff --git a/cista/static/index.html b/cista/static/index.html new file mode 100644 index 0000000..5e63ced --- /dev/null +++ b/cista/static/index.html @@ -0,0 +1,154 @@ + +Storage +
+

Quick file upload

+

Uses parallel WebSocket connections for increased bandwidth /api/upload

+ + +
+ +
+

File downloads (websocket)

+ +
+ +

File listings

+

Plain HTML browser /files/

+ +

JSON list updated via WebSocket /api/watch:

+ + + diff --git a/cista/watching.py b/cista/watching.py new file mode 100644 index 0000000..411ee36 --- /dev/null +++ b/cista/watching.py @@ -0,0 +1,58 @@ +import secrets +from hashlib import sha256 +from pathlib import Path, PurePosixPath + +import msgspec + +from .fileio import ROOT +from .protocol import DirEntry, FileEntry + +secret = secrets.token_bytes(8) +pubsub = {} + +def fuid(stat): + return sha256((stat.st_dev << 32 | stat.st_ino).to_bytes(8, 'big') + secret).hexdigest()[:16] + +def walk(path: Path = ROOT): + try: + s = path.stat() + mtime = int(s.st_mtime) + if path.is_file(): + return FileEntry(s.st_size, mtime) + + tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None} + if tree: + size = sum(v.size for v in tree.values()) + mtime = max(mtime, max(v.mtime for v in tree.values())) + else: + size = 0 + return DirEntry(size, mtime, tree) + except OSError as e: + print("OS error walking path", path, e) + return None + +tree = walk() + +def update(relpath: PurePosixPath): + ptr = tree.dir + path = ROOT + name = "" + for name in relpath.parts[:-1]: + path /= name + try: + ptr = ptr[name].dir + except KeyError: + break + new = walk(path) + old = ptr.pop(name, None) + if new is not None: + ptr[name] = new + if old == new: + return + print("Update", relpath) + # TODO: update parents size/mtime + msg = msgspec.json.encode({"update": { + "path": relpath.as_posix(), + "data": new, + }}) + for queue in pubsub.values(): queue.put_nowait(msg) diff --git a/server/app.py b/server/app.py deleted file mode 100644 index 06c5011..0000000 --- a/server/app.py +++ /dev/null @@ -1,156 +0,0 @@ -from tarfile import DEFAULT_FORMAT -from sanic import Sanic -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler -from pathlib import Path, PurePosixPath -from hashlib import sha256 -import secrets -import json -import unicodedata -import asyncio -from pathvalidate import sanitize_filepath -import os - -ROOT = Path(os.environ.get("STORAGE", Path.cwd())) -secret = secrets.token_bytes(8) - -def fuid(stat): - return sha256((stat.st_dev << 32 | stat.st_ino).to_bytes(8, 'big') + secret).hexdigest()[:16] - -def walk(path: Path = ROOT): - try: - s = path.stat() - mtime = int(s.st_mtime) - if path.is_file(): - return s.st_size, mtime - - tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None} - if tree: - size = sum(v[0] for v in tree.values()) - mtime = max(v[1] for v in tree.values()) - else: - size = 0 - return size, mtime, tree - except OSError as e: - return None - -tree = walk() - -def update(relpath: PurePosixPath): - ptr = tree[2] - path = ROOT - name = "" - for name in relpath.parts[:-1]: - path /= name - try: - ptr = ptr[name][2] - except KeyError: - break - new = walk(path) - old = ptr.pop(name, None) - if new is not None: - ptr[name] = new - if old == new: - return - print("Update", relpath, new) - # TODO: update parents size/mtime - msg = json.dumps({"update": { - "path": relpath.as_posix(), - "data": new, - }}) - for queue in watchers.values(): queue.put_nowait(msg) - -app = Sanic(__name__) - -@app.before_server_start -async def start_watcher(app, _): - class Handler(FileSystemEventHandler): - def on_any_event(self, event): - update(Path(event.src_path).relative_to(ROOT)) - app.ctx.observer = Observer() - app.ctx.observer.schedule(Handler(), str(ROOT), recursive=True) - app.ctx.observer.start() - -@app.after_server_stop -async def stop_watcher(app, _): - app.ctx.observer.stop() - app.ctx.observer.join() - -app.static('/', "index.html", name="indexhtml") -app.static("/files", ROOT, use_content_range=True, stream_large_files=True, directory_view=True) - -watchers = {} - -@app.websocket('/api/watch') -async def watch(request, ws): - try: - q = watchers[ws] = asyncio.Queue() - await ws.send(json.dumps({"root": tree})) - while True: - await ws.send(await q.get()) - finally: - del watchers[ws] - -@app.websocket('/api/upload') -async def upload(request, ws): - file = None - filename = None - left = 0 - msg = {} - try: - async for data in ws: - if isinstance(data, bytes): - if not file: - print(f"No file open, received {len(data)} bytes") - break - if len(data) > left: - msg["error"] = "Too much data" - ws.send(json.dumps(msg)) - return - left -= len(data) - file.write(data) - if left == 0: - msg["written"] = end - start - await ws.send(json.dumps(msg)) - msg = {} - continue - - msg = json.loads(data) - name = str(msg['name']) - size = int(msg['size']) - start = int(msg['start']) - end = int(msg['end']) - if not 0 <= start < end <= size: - msg["error"] = "Invalid range" - ws.send(json.dumps(msg)) - return - left = end - start - if filename != name: - if file: - file.close() - file, filename = None, None - file = openfile(name) - file.truncate(size) - filename = name - file.seek(start) - - finally: - if file: - file.close() - - -def openfile(name): - # Name sanitation & security - name = unicodedata.normalize("NFC", name).replace("\\", "") - name = sanitize_filepath(name) - p = PurePosixPath(name) - if p.is_absolute() or any(n.startswith(".") for n in p.parts): - raise ValueError("Invalid filename") - # Create/open file - path = ROOT / p - path.parent.mkdir(parents=True, exist_ok=True) - try: - file = path.open("xb+") # create new file - except FileExistsError: - file = path.open("rb+") # write to existing file (along with other workers) - return file diff --git a/server/fileio.py b/server/fileio.py deleted file mode 100644 index 0420d65..0000000 --- a/server/fileio.py +++ /dev/null @@ -1,83 +0,0 @@ -import asyncio -import os - -from asynclink import AsyncLink -from lrucache import LRUCache - -class File: - def __init__(self, filename): - self.filename = filename - self.fd = None - self.writable = False - - def open_ro(self): - self.close() - self.fd = os.open(self.filename, os.O_RDONLY) - - def open_rw(self): - self.close() - self.fd = os.open(self.filename, os.O_RDWR | os.O_CREAT) - self.writable = True - - def write(self, pos, buffer, *, file_size=None): - if not self.writable: - self.open_rw() - if file_size is not None: - os.ftruncate(self.fd, file_size) - os.lseek(self.fd, pos, os.SEEK_SET) - os.write(self.fd, buffer) - - def __getitem__(self, slice): - if self.fd is None: - self.open_ro() - os.lseek(self.fd, slice.start, os.SEEK_SET) - l = slice.stop - slice.start - data = os.read(self.fd, l) - if len(data) < l: raise EOFError("Error reading requested range") - return data - - def close(self): - if self.fd is not None: - os.close(self.fd) - self.fd = self.writable = None - - def __del__(self): - self.close() - - -class FileServer: - - async def start(self): - self.alink = AsyncLink() - self.worker = asyncio.get_event_loop().run_in_executor(None, self.worker_thread, alink.to_sync) - - async def stop(self): - await self.alink.stop() - await self.worker - - def worker_thread(self, slink): - cache = LRUCache(File, capacity=10, maxage=5.0) - for req in slink: - with req as (command, msg, data): - if command == "upload": - req.set_result(self.upload(msg, data)) - else: - raise NotImplementedError(f"Unhandled {command=} {msg}") - - - def upload(self, msg, data): - name = str(msg['name']) - size = int(msg['size']) - start = int(msg['start']) - end = int(msg['end']) - - if not 0 <= start < end <= size: - raise OverflowError("Invalid range") - if end - start > 1<<20: - raise OverflowError("Too much data, max 1 MiB.") - if end - start != len(data): - raise ValueError("Data length does not match range") - f = self.cache[name] - f.write(start, data, file_size=size) - - return {"written": len(data), **msg} diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..01a2513 --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +# Install package +import setuptools +import pathlib + +doc = pathlib.Path(__file__).parent.joinpath('README.md').read_text(encoding="UTF-8") + +setuptools.setup( + name="cista", + version="0.0.1", + author="Vasanko lda", + description="Dropbox-like file server with modern web interface", + long_description=doc, + long_description_content_type="text/markdown", + url="", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + ], + install_requires=[ + "sanic", + "msgspec", + "watchdog", + "pathvalidate", + ], +)