Initial commit
This commit is contained in:
commit
7297eeba4b
156
server/app.py
Normal file
156
server/app.py
Normal file
|
@ -0,0 +1,156 @@
|
|||
from tarfile import DEFAULT_FORMAT
|
||||
from sanic import Sanic
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from pathlib import Path, PurePosixPath
|
||||
from hashlib import sha256
|
||||
import secrets
|
||||
import json
|
||||
import unicodedata
|
||||
import asyncio
|
||||
from pathvalidate import sanitize_filepath
|
||||
import os
|
||||
|
||||
ROOT = Path(os.environ.get("STORAGE", Path.cwd()))
|
||||
secret = secrets.token_bytes(8)
|
||||
|
||||
def fuid(stat):
|
||||
return sha256((stat.st_dev << 32 | stat.st_ino).to_bytes(8, 'big') + secret).hexdigest()[:16]
|
||||
|
||||
def walk(path: Path = ROOT):
|
||||
try:
|
||||
s = path.stat()
|
||||
mtime = int(s.st_mtime)
|
||||
if path.is_file():
|
||||
return s.st_size, mtime
|
||||
|
||||
tree = {p.name: v for p in path.iterdir() if not p.name.startswith('.') if (v := walk(p)) is not None}
|
||||
if tree:
|
||||
size = sum(v[0] for v in tree.values())
|
||||
mtime = max(v[1] for v in tree.values())
|
||||
else:
|
||||
size = 0
|
||||
return size, mtime, tree
|
||||
except OSError as e:
|
||||
return None
|
||||
|
||||
tree = walk()
|
||||
|
||||
def update(relpath: PurePosixPath):
|
||||
ptr = tree[2]
|
||||
path = ROOT
|
||||
name = ""
|
||||
for name in relpath.parts[:-1]:
|
||||
path /= name
|
||||
try:
|
||||
ptr = ptr[name][2]
|
||||
except KeyError:
|
||||
break
|
||||
new = walk(path)
|
||||
old = ptr.pop(name, None)
|
||||
if new is not None:
|
||||
ptr[name] = new
|
||||
if old == new:
|
||||
return
|
||||
print("Update", relpath, new)
|
||||
# TODO: update parents size/mtime
|
||||
msg = json.dumps({"update": {
|
||||
"path": relpath.as_posix(),
|
||||
"data": new,
|
||||
}})
|
||||
for queue in watchers.values(): queue.put_nowait(msg)
|
||||
|
||||
app = Sanic(__name__)
|
||||
|
||||
@app.before_server_start
|
||||
async def start_watcher(app, _):
|
||||
class Handler(FileSystemEventHandler):
|
||||
def on_any_event(self, event):
|
||||
update(Path(event.src_path).relative_to(ROOT))
|
||||
app.ctx.observer = Observer()
|
||||
app.ctx.observer.schedule(Handler(), str(ROOT), recursive=True)
|
||||
app.ctx.observer.start()
|
||||
|
||||
@app.after_server_stop
|
||||
async def stop_watcher(app, _):
|
||||
app.ctx.observer.stop()
|
||||
app.ctx.observer.join()
|
||||
|
||||
app.static('/', "index.html", name="indexhtml")
|
||||
app.static("/files", ROOT, use_content_range=True, stream_large_files=True, directory_view=True)
|
||||
|
||||
watchers = {}
|
||||
|
||||
@app.websocket('/api/watch')
|
||||
async def watch(request, ws):
|
||||
try:
|
||||
q = watchers[ws] = asyncio.Queue()
|
||||
await ws.send(json.dumps({"root": tree}))
|
||||
while True:
|
||||
await ws.send(await q.get())
|
||||
finally:
|
||||
del watchers[ws]
|
||||
|
||||
@app.websocket('/api/upload')
|
||||
async def upload(request, ws):
|
||||
file = None
|
||||
filename = None
|
||||
left = 0
|
||||
msg = {}
|
||||
try:
|
||||
async for data in ws:
|
||||
if isinstance(data, bytes):
|
||||
if not file:
|
||||
print(f"No file open, received {len(data)} bytes")
|
||||
break
|
||||
if len(data) > left:
|
||||
msg["error"] = "Too much data"
|
||||
ws.send(json.dumps(msg))
|
||||
return
|
||||
left -= len(data)
|
||||
file.write(data)
|
||||
if left == 0:
|
||||
msg["written"] = end - start
|
||||
await ws.send(json.dumps(msg))
|
||||
msg = {}
|
||||
continue
|
||||
|
||||
msg = json.loads(data)
|
||||
name = str(msg['name'])
|
||||
size = int(msg['size'])
|
||||
start = int(msg['start'])
|
||||
end = int(msg['end'])
|
||||
if not 0 <= start < end <= size:
|
||||
msg["error"] = "Invalid range"
|
||||
ws.send(json.dumps(msg))
|
||||
return
|
||||
left = end - start
|
||||
if filename != name:
|
||||
if file:
|
||||
file.close()
|
||||
file, filename = None, None
|
||||
file = openfile(name)
|
||||
file.truncate(size)
|
||||
filename = name
|
||||
file.seek(start)
|
||||
|
||||
finally:
|
||||
if file:
|
||||
file.close()
|
||||
|
||||
|
||||
def openfile(name):
|
||||
# Name sanitation & security
|
||||
name = unicodedata.normalize("NFC", name).replace("\\", "")
|
||||
name = sanitize_filepath(name)
|
||||
p = PurePosixPath(name)
|
||||
if p.is_absolute() or any(n.startswith(".") for n in p.parts):
|
||||
raise ValueError("Invalid filename")
|
||||
# Create/open file
|
||||
path = ROOT / p
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
file = path.open("xb+") # create new file
|
||||
except FileExistsError:
|
||||
file = path.open("rb+") # write to existing file (along with other workers)
|
||||
return file
|
93
server/asynclink.py
Normal file
93
server/asynclink.py
Normal file
|
@ -0,0 +1,93 @@
|
|||
import asyncio
|
||||
from contextlib import suppress
|
||||
|
||||
class AsyncLink:
|
||||
"""Facilitate two-way connection between asyncio and a worker thread."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize; must be called from async context."""
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.queue = asyncio.Queue(maxsize=1)
|
||||
|
||||
async def __call__(self, command) -> asyncio.Future:
|
||||
"""Run command in worker thread; awaitable.
|
||||
|
||||
Args:
|
||||
command: Command to run in worker thread.
|
||||
"""
|
||||
fut = self.loop.create_future()
|
||||
await self.queue.put((command, fut))
|
||||
return await fut
|
||||
|
||||
@property
|
||||
def to_sync(self):
|
||||
"""Yield SyncRequests from async caller when called from worker thread."""
|
||||
while (req := self._await(self._get())) is not None:
|
||||
yield SyncRequest(self, req)
|
||||
|
||||
async def _get(self):
|
||||
"""Retrieve an item from the queue; handle cancellation."""
|
||||
with suppress(asyncio.CancelledError):
|
||||
ret = await self.queue.get()
|
||||
self.queue.task_done()
|
||||
return ret
|
||||
|
||||
def _await(self, coro):
|
||||
"""Run coroutine in main thread and return result; called from worker."""
|
||||
return asyncio.run_coroutine_threadsafe(coro, self.loop).result()
|
||||
|
||||
async def stop(self):
|
||||
"""Stop worker and clean up."""
|
||||
while not self.queue.empty():
|
||||
command, future = self.queue.get_nowait()
|
||||
if not future.done():
|
||||
future.set_exception(Exception("AsyncLink stopped"))
|
||||
self.queue.task_done()
|
||||
await self.queue.put(None)
|
||||
|
||||
|
||||
async def set_result(fut: asyncio.Future, value=None, exception=None):
|
||||
"""Set result or exception on an asyncio.Future object.
|
||||
|
||||
Args:
|
||||
fut (asyncio.Future): Future to set result or exception on.
|
||||
value: Result to set on the future.
|
||||
exception: Exception to set on the future.
|
||||
"""
|
||||
with suppress(asyncio.InvalidStateError):
|
||||
if exception is None:
|
||||
fut.set_result(value)
|
||||
else:
|
||||
fut.set_exception(exception)
|
||||
|
||||
|
||||
class SyncRequest:
|
||||
"""Handle values from sync thread in main asyncio event loop."""
|
||||
|
||||
def __init__(self, alink: AsyncLink, req):
|
||||
"""Initialize SyncRequest with AsyncLink and request."""
|
||||
self.alink = alink
|
||||
self.command, self.future = req
|
||||
self.done = False
|
||||
|
||||
def __enter__(self):
|
||||
"""Provide command to with-block and handle exceptions."""
|
||||
return self.command
|
||||
|
||||
def __exit__(self, exc_type, exc, traceback):
|
||||
"""Set result or exception on exit; suppress exceptions in with-block."""
|
||||
if exc:
|
||||
self.set_exception(exc)
|
||||
return True
|
||||
elif not self.done:
|
||||
self.set_result(None)
|
||||
|
||||
def set_result(self, value):
|
||||
"""Set result value; mark as done."""
|
||||
self.done = True
|
||||
self.alink._await(set_result(self.future, value))
|
||||
|
||||
def set_exception(self, exc):
|
||||
"""Set exception; mark as done."""
|
||||
self.done = True
|
||||
self.alink._await(set_result(self.future, exception=exc))
|
83
server/fileio.py
Normal file
83
server/fileio.py
Normal file
|
@ -0,0 +1,83 @@
|
|||
import asyncio
|
||||
import os
|
||||
|
||||
from asynclink import AsyncLink
|
||||
from lrucache import LRUCache
|
||||
|
||||
class File:
|
||||
def __init__(self, filename):
|
||||
self.filename = filename
|
||||
self.fd = None
|
||||
self.writable = False
|
||||
|
||||
def open_ro(self):
|
||||
self.close()
|
||||
self.fd = os.open(self.filename, os.O_RDONLY)
|
||||
|
||||
def open_rw(self):
|
||||
self.close()
|
||||
self.fd = os.open(self.filename, os.O_RDWR | os.O_CREAT)
|
||||
self.writable = True
|
||||
|
||||
def write(self, pos, buffer, *, file_size=None):
|
||||
if not self.writable:
|
||||
self.open_rw()
|
||||
if file_size is not None:
|
||||
os.ftruncate(self.fd, file_size)
|
||||
os.lseek(self.fd, pos, os.SEEK_SET)
|
||||
os.write(self.fd, buffer)
|
||||
|
||||
def __getitem__(self, slice):
|
||||
if self.fd is None:
|
||||
self.open_ro()
|
||||
os.lseek(self.fd, slice.start, os.SEEK_SET)
|
||||
l = slice.stop - slice.start
|
||||
data = os.read(self.fd, l)
|
||||
if len(data) < l: raise EOFError("Error reading requested range")
|
||||
return data
|
||||
|
||||
def close(self):
|
||||
if self.fd is not None:
|
||||
os.close(self.fd)
|
||||
self.fd = self.writable = None
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
|
||||
class FileServer:
|
||||
|
||||
async def start(self):
|
||||
self.alink = AsyncLink()
|
||||
self.worker = asyncio.get_event_loop().run_in_executor(None, self.worker_thread, alink.to_sync)
|
||||
|
||||
async def stop(self):
|
||||
await self.alink.stop()
|
||||
await self.worker
|
||||
|
||||
def worker_thread(self, slink):
|
||||
cache = LRUCache(File, capacity=10, maxage=5.0)
|
||||
for req in slink:
|
||||
with req as (command, msg, data):
|
||||
if command == "upload":
|
||||
req.set_result(self.upload(msg, data))
|
||||
else:
|
||||
raise NotImplementedError(f"Unhandled {command=} {msg}")
|
||||
|
||||
|
||||
def upload(self, msg, data):
|
||||
name = str(msg['name'])
|
||||
size = int(msg['size'])
|
||||
start = int(msg['start'])
|
||||
end = int(msg['end'])
|
||||
|
||||
if not 0 <= start < end <= size:
|
||||
raise OverflowError("Invalid range")
|
||||
if end - start > 1<<20:
|
||||
raise OverflowError("Too much data, max 1 MiB.")
|
||||
if end - start != len(data):
|
||||
raise ValueError("Data length does not match range")
|
||||
f = self.cache[name]
|
||||
f.write(start, data, file_size=size)
|
||||
|
||||
return {"written": len(data), **msg}
|
29
server/lrucache.py
Normal file
29
server/lrucache.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
from time import monotonic
|
||||
|
||||
class LRUCache:
|
||||
def __init__(self, open: callable, *, capacity: int, maxage: float):
|
||||
self.open = open
|
||||
self.capacity = capacity
|
||||
self.maxage = maxage
|
||||
self.cache = [] # Each item is a tuple: (key, handle, timestamp), recent items first
|
||||
|
||||
def __contains__(self, key):
|
||||
return any(rec[0] == key for rec in self.cache)
|
||||
|
||||
def __getitem__(self, key):
|
||||
# Take from cache or open a new one
|
||||
for i, (k, f, ts) in enumerate(self.cache):
|
||||
if k == key:
|
||||
self.cache.pop(i)
|
||||
break
|
||||
else:
|
||||
f = self.open(key)
|
||||
# Add/restore to end of cache
|
||||
self.cache.append((key, f, monotonic()))
|
||||
self.expire_items()
|
||||
return f
|
||||
|
||||
def expire_items(self):
|
||||
ts = monotonic() - self.maxage
|
||||
while len(self.cache) > self.capacity or self.cache and self.cache[-1][2] < ts:
|
||||
self.cache.pop()[1].close()
|
Loading…
Reference in New Issue
Block a user